安装依赖

pip install findpeaks

使用

import numpy as npfrom findpeaks import findpeaksif __name__ == '__main__':# 初始化fp = findpeaks(method='peakdetect',  # 检测方式:一维数组【】二维数据【】whitelist=['valley'],  # 检测目标【峰peak,谷valley,峰谷['peak','valley']】lookahead=1,  # 前瞻性优化算法【数据量越少,此数字越小,比如50个数据,最好选择1或者2】interpolate=10,  # 插值,放大横坐标【数字越高,作图的边缘越不锋利】)# 数值data = np.asarray([97, 11, 27, 69, 39, 52, 84, 81, 92, 84, 83, 95, 10, 87, 72, 84, 36, 15, 85, 68, 60, 59, 61, 3,13, 12, 4, 80, 28, 53, 24, 32, 10, 2, 9, 57, 15, 66, 99, 26, 40, 63, 97, 22, 27, 98, 15, 84,76, 34])results = fp.fit(X=data,  # 数据x=None  # 作图的x轴坐标(默认0,1,2,3...))# 打印结果print(results)# 作图fp.plot()

使用案例:

from datetime import datetime, timedeltafrom findpeaks import findpeaks
from pandas import DataFramefrom db.config import session
from db.db_models import ReportCapacityEnergy
from utils.influxdb_util import InfluxdbUtil, InfluxdbStorageclass PeakValleyDto:def __init__(self, value, index, time, flag):"""峰谷值DTO:param value: 值:param index: 所在数组的索引:param time: 时间戳:param flag: 自定义标识"""self.value = valueself.index = indexself.time = timeself.flag = flagclass CapacityEnergyService:"""产能与能耗"""def __init__(self):"""初始化:"""# influxdb工具类self.influxdb_util = InfluxdbUtil(url="http://xxxxxxxxx.com/",token="xxxxxxxx",org="xxxx")@staticmethoddef check_out_peak_valley(capacity_data) -> list[PeakValleyDto]:"""检出的峰谷值:return:"""result: list[PeakValleyDto] = []m = map(lambda x: x.value, capacity_data)fp = findpeaks(method='peakdetect', whitelist=['peak', 'valley'], lookahead=1, interpolate=1)# 获取峰谷值fit: dict = fp.fit(X=list(m))# 原始数据df_: DataFrame = fit['df']# 平滑数据df_interp_: DataFrame = fit['df_interp']# 波谷df__valley_true_: DataFrame = df_[df_['valley'] == True]for valley in df__valley_true_.iterrows():row = valley[1]value = row['y']  # 值index = row['x']  # 索引time = capacity_data[row['x']].time  # 时间戳flag = 'valley'  # 标记result.append(PeakValleyDto(value, index, time, flag))# 波峰df__peak_true_: DataFrame = df_[df_['peak'] == True]for peak in df__peak_true_.iterrows():row = peak[1]value = row['y']  # 值index = row['x']  # 索引time = capacity_data[row['x']].time  # 时间戳flag = 'peak'  # 标记result.append(PeakValleyDto(value, index, time, flag))# 作图fp.plot()if len(result) > 0:sor = sorted(result, key=lambda x: x.time)# 起始点和结束点不是峰谷# sor[0].flag = 'first'# sor[-1].flag = 'last'sor[0] = PeakValleyDto(capacity_data[0].value, 0, capacity_data[0].time, 'first')sor[-1] = PeakValleyDto(capacity_data[-1].value, len(capacity_data) - 1,capacity_data[-1].time, 'last')return sorelse:if len(capacity_data) == 0:return [PeakValleyDto(0, 0, None, 'first'),PeakValleyDto(0, 0, None, 'last')]return [PeakValleyDto(capacity_data[0].value, 0, capacity_data[0].time, 'first'),PeakValleyDto(capacity_data[-1].value, len(capacity_data) - 1,capacity_data[-1].time, 'last')]def calculate_capacity(self, start: datetime, end: datetime):"""计算产能:return:"""capacity = 0capacity_data: list[InfluxdbStorage] = self.influxdb_util.query(bucket="JLKZQ",measurement="N_PLTS",every="1m",start=start,stop=end,createEmpty=False)# 获取峰谷值peak_valleys = self.check_out_peak_valley(capacity_data)# 拆成两个一组group_every_two = [peak_valleys[i:i + 2] for i in range(0, len(peak_valleys), 2)]for group in group_every_two:print(float(group[1].value), "-", float(group[0].value), "=", float(group[1].value) - float(group[0].value))capacity += float(group[1].value) - float(group[0].value)print("---------------------------")return capacitydef calculate_energy(self, start: datetime, end: datetime):"""计算能耗:return:"""energy = 0energy_data: list[InfluxdbStorage] = self.influxdb_util.query(bucket="TRQ",measurement="HZ_TRQ1_BKJLLTJ_GET_M3",every="1m",start=start,stop=end,createEmpty=False)# # 获取峰谷值# peak_valleys = self.check_out_peak_valley(energy_data)# # 拆成两个一组# group_every_two = [peak_valleys[i:i + 2] for i in range(0, len(peak_valleys), 2)]# for group in group_every_two:#     print(float(group[1].value), "-", float(group[0].value), "=", float(group[1].value) - float(group[0].value))#     energy += float(group[1].value) - float(group[0].value)# print("---------------------------")# return energy# 不需要峰谷计算,直接尾头相减即可11656.67857142858return float(energy_data[-1].value) - float(energy_data[0].value)def start(self, start: datetime, end: datetime) -> tuple:"""实时查询产能和能耗:param start::param end::return:"""capacity_num = self.calculate_capacity(start, end)energy_num = self.calculate_energy(start, end)print("计算范围:", start, " 到 ", end)print("产能:", capacity_num)print("能耗:", energy_num)return capacity_num, energy_numdef capacity_energy_db(self, start_date: datetime, end_date: datetime):"""产能能耗-按天计算-数据入库:return:"""# 这里应该是根据作业指导书来分别查询一天中不同的作业指导书时段,计算出各个桶数和相应的重量,他们之和作为产能 TODOstart_date = start_date.replace(hour=0, minute=0, second=0)end_date = end_date.replace(hour=0, minute=0, second=0)for i in range(end_date.__sub__(start_date).days):start = start_date + timedelta(days=i)end = start_date + + timedelta(days=i + 1)capacity_energy_service = CapacityEnergyService()capacity_num, energy_num = capacity_energy_service.start(start=start, end=end)report_capacity_energy = ReportCapacityEnergy(capacity_bucket='JLKZQ',energy_bucket='TRQ',capacity_measurement='N_PLTS',energy_measurement='HZ_TRQ1_BKJLLTJ_GET_M3',barrel_num=capacity_num,gas_num=energy_num,start_time=start,end_time=end,week=start.isocalendar().week,instruction_ids="",type=1,remarks='',)count = session.query(ReportCapacityEnergy) \.filter(ReportCapacityEnergy.start_time == start) \.filter(ReportCapacityEnergy.end_time == end) \.count()if count == 1:session.query(ReportCapacityEnergy) \.filter(ReportCapacityEnergy.start_time == start) \.filter(ReportCapacityEnergy.end_time == end) \.update({ReportCapacityEnergy.barrel_num: capacity_num, ReportCapacityEnergy.gas_num: energy_num})else:session.add(report_capacity_energy)session.commit()

python峰谷值算法:findpeaks相关推荐

  1. 程序 峰谷值 提取_ABAQUS:Python后处理—用excel提取位移、体积、应变等变化(一)...

    00 实例模型 一个金属长方体,我们需要对其做拉伸的加载约束示意图如图1,并在完成后采用Python命令流读取参考点的位移.体积.应变随加载时间的变化情况. 图1 金属长方体约束加载示意图 01 Py ...

  2. 程序 峰谷值 提取_医学影像组学特征值(Radiomics Features)提取之Pyradiomics(一)理论篇...

    医学影像组学特征值(Radiomics Features)提取之pyradiomics(一)理论篇 pyradiomics是一个开源的python软件包,可以从医学影像中提取出Radiomics影像组 ...

  3. 将数组修改为峰谷相间

    将数组修改为峰谷相间 题目说明: 在一个整数数组中,"峰"是大于或等于相邻整数的元素,相应地,"谷"是小于或等于相邻整数的元素.例如,在数组{5, 8, 6, ...

  4. python实现洗牌算法_洗牌算法及 random 中 shuffle 方法和 sample 方法浅析

    对于算法书买了一本又一本却没一本读完超过 10%,Leetcode 刷题从来没坚持超过 3 天的我来说,算法能力真的是渣渣.但是,今天决定写一篇跟算法有关的文章.起因是读了吴师兄的文章 <扫雷与 ...

  5. python语言入门w-Python算法基础

    有穷性:算法的有穷性是指算法必须能在执行有限个步骤之后终止: 确切性:算法的每一步骤必须有确切的定义: 输入项:一个算法有0个或多个输入,以刻画运算对象的初始情况,所谓0个输入是指算法本身定出了初始条 ...

  6. gbdt 算法比随机森林容易_用Python实现随机森林算法

    CDA数据分析师 出品 拥有高方差使得决策树(secision tress)在处理特定训练数据集时其结果显得相对脆弱.bagging(bootstrap aggregating 的缩写)算法从训练数据 ...

  7. python分割数字_对python数据切割归并算法的实例讲解

    当一个 .txt 文件的数据过于庞大,此时想要对数据进行排序就需要先将数据进行切割,然后通过归并排序,最终实现对整体数据的排序.要实现这个过程我们需要进行以下几步:获取总数据行数:根据行数按照自己的需 ...

  8. Python实现 logistic 回归算法

    Python实现 logistic 回归算法 1.算法介绍 模型描述: sigmoid函数: 原理: 优化目标:最小化 sigmoid(f(x)) 和真实标签的差别(有不同的 cost functio ...

  9. Python实现的导弹跟踪算法,燃!

    巴以冲突,想必都有关注. 有这么一个导弹拦截场面: 作者:半壶砂 https://www.cnblogs.com/halfsand/p/7976636.html 这里涉及拦截导弹的自动跟踪.最近,看到 ...

最新文章

  1. 电大本科计算机考试题库及答案,电大计算机考试题题库及答案.doc
  2. rds_dbsync数据源同步工具
  3. 杀java_java怎么杀掉java进程
  4. 从零开始 - iOSRTMP推流篇(1)
  5. ubuntu 安装php mcrypt扩展
  6. 华为鸿蒙深度研究(100页)
  7. mongodb 日志,(一个看日志解决新问题的方法) -- clwu
  8. Android全局变量使用
  9. 基于C#的图书管理系统
  10. java mars2_Mars-java 2.2.2 发布,不需要容器的 Java Web 开发框架
  11. java 获取指定时间的前一个小时
  12. Monte Carlo Integration
  13. GARCH-QR非线性回归(GQNR)交易模型(初稿)
  14. 使用数组快速填充Excel
  15. Markdown格式下符号及数学公式的输入
  16. 日期和时间处理(Python实现)
  17. 【Python】利用map和reduce编写一个str2float函数,把字符串'123.456'转换成浮点数123.456
  18. 基于Struts开发物流配送(快递)管理系统
  19. 致程序员:过了一面二面,千万别栽在HR面——70道HR面试题分享
  20. 彻底卸载Office 2010:Office 2010清理工具

热门文章

  1. chrome下旧显卡开启WebGL
  2. WebRTC -- P2P及NAT穿越技术介绍
  3. SCL语言如何进行运算和表达式计算?
  4. 笔试题-2023-华为-数字芯片(第2套)【纯净题目版】
  5. Mysql下载安装步骤
  6. APT威胁检测之flash类检测方法
  7. ()文献可视化--vosviewer入门
  8. 跟着玄武大佬学NTLM relay攻防
  9. java编写动物乐园_编写动物乐园java,满意会加分的!!!
  10. Github库名命名规范