简介

Prophet是facebook开源的时间序列预测工具,使用时间序列分解与机器学习拟合的方法进行建模预测
,关于prophet模型优点本文不再累述,网络上的文章也比较多了,各种可视化,参数的解释与demo演示,但是真正用到工业上大规模的可供学习的中文材料并不多。

本文打算使用PySpark进行多序列预测建模,会给出一个比较详细的脚本,供交流学习,重点在于使用hive数据/分布式,数据预处理,以及pandas_udf对多条序列进行循环执行。
tips:背景说明,在十万级别的sku序列上使用prophet预测每个序列未来七天的销售。


文章目录

  • 1.导入库和初始化设置
  • 2.数据预处理
  • 3.建模
  • 4.读取hive数据,调用spark进行prophet模型预测

1.导入库和初始化设置

Pandas Udf 构建在 Apache Arrow 之上,因此具有低开销,高性能的特点,udf对每条记录都会操作一次,数据在 JVM 和 Python 中传输,pandas_udf就是使用 Java 和 Scala 中定义 UDF,然后在 python 中调用。

#导入库
import datetime
from dateutil.relativedelta import relativedelta
from fbprophet import Prophet
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *#初始化
spark = SparkSession. \Builder(). \config("spark.sql.execution.arrow.enabled", "true"). \enableHiveSupport(). \getOrCreate()

其中初始化config:开启spark df与pandas df 相互转化的性能优化配置.


2.数据预处理


def sale_ds(df):df['ds'] = pd.to_datetime(df['ds'])df = df[['store_sku', 'ds', 'y']]# 控制长度,周期不用太长,关注最近的几个完整周期即可start_day = (df['ds'].max() -relativedelta(days=63)).strftime('%Y-%m-%d')df = df[df['ds'] >= start_day][['store_sku', 'ds', 'y']]# 筛选条件:1 序列长度大于等于14,且过去最少有七天的销售记录;# 条件1,保障模型有两个完整的周期数据;# 条件2,避免出现0,0,0,0,0,0,1,0,1这样数据稀疏的数据出现sale_set = df.groupby(['store_sku']).filter(lambda x: len(x) >= 14 and np.sum(x['y']) > 7)return sale_setdef replace_fill(data):"""先尝试使用上周的数据填补,再针对极端的数据进行cap,保障序列的完整和平滑性:param data:单个序列:param name: 序列名称,store_sku:return: 修复后的一条序列"""data['ds'] = pd.to_datetime(data['ds'], format='%Y-%m-%d')data['y'] = data['y'].astype(float)data.loc[data['y'] <= 0, 'y'] = np.NaNdata.loc[data['y'].isnull(), 'y'] = data['y'].shift(7).values[0]data.loc[data['y'].isnull(), 'y'] = data['y'].shift(-7).values[0]data.loc[data['y'].isnull(), 'y'] = data['y'].shift(-14).values[0]data.loc[data['y'].isnull(), 'y'] = data['y'].shift(14).values[0]data.loc[data['y'].isnull(), 'y'] = data['y'].interpolate(methon='nearest', order=3)low = data[data['y'] > 0]['y'].quantile(0.10)high = data[data['y'] > 0]['y'].quantile(0.90)data.loc[data['y'] < low, 'y'] = np.NaNdata.loc[data['y'] > high, 'y'] = np.NaNdata['y'] = data['y'].fillna(data['y'].mean())data['y'] = np.log1p(data['y'])return data

以上为数据预处理,具体内容见注释.

放入模型中的时间和y值名称必须是ds和y,首先控制数据的周期长度,如果预测天这种粒度的任务,则使用最近的4-6周即可。

因为是放入了长度不一的多个序列,为了让预测更加可靠,对序列的长度有一定的限定,比如,序列长度至少有14天,还要一个需要注意的问题是,如果出现0,0,0,0,0,0,1,0,1这样数据稀疏的数据的时候,prophet会报错,报错内容大致为,std太低,反推回去就是放入的数据类似于常量,模型无法拟合。

至于缺失值的填充,prophet可以设置y为nan,模型在拟合过程中也会自动填充一个预测值,因为我们预测的为sku销量,是具有星期这种周期性的,所以如果出现某一天的缺失,我们倾向于使用最近几周同期数据进行填充,没有优先使用均值或众数进行填充,是因为,均值和众数会掩盖序列的周期性,破坏整个序列的规律,为了进一步对数据进行平滑,对于异常值还进行了分位数盖帽,因为时序数据往往是偏态分布,所以我们对原始值做了取对数处理。

以上的数据预处理比较简单,其中多数可以使用hive进行操作,会更加高效,这里放出来的目的是演示一种思路以及python函数和最后的pandas_udf交互。


3.建模

def prophet_train(data):model = Prophet(daily_seasonality=False,yearly_seasonality=False,holidays=holiday_df,holidays_prior_scale=10)model.add_seasonality(name='weekly',period=7,fourier_order=3,prior_scale=0.10)model.fit(data)future = model.make_future_dataframe(periods=7, freq='d')forecast = model.predict(future)forecast['pro_pred'] = np.expm1(forecast['yhat'])forecast_df=forecast[['store_sku','ds','pro_pred']]# 对预测值修正forecast_df.loc[forecast_df['pro_pred'] < 0, 'pro_pred'] = 0low = (1 + 0.1) * data['y'].min()hight = min((1 + 0.05) * data['y'].max(), 10000)forecast_df.loc[forecast_df['pro_pred'] < low, 'pro_pred'] = lowforecast_df.loc[forecast_df['pro_pred'] > hight, 'pro_pred'] = hightreturn forecast_df

以上参数设置详见https://zhuanlan.zhihu.com/p/52330017

函数内部的holiday_df是假日数据,数据格式需要按照文档要求进行定义,改函数部分也会和整个代码一起放在github,如果序列中最近呈现出较大的下滑或者增长,那么预测值很容易得到负数或者非常大,这个时候我们依然需要对预测值进行修正,而非完全交给模型,当然你也可以在放入数据中设置上下限。

data['cap'] = 1000  #上限
data['floor'] = 6  #下限

该函数把前面的数据预处理函数和模型训练函数放在一个函数中,类似于主函数,目的是使用统一的输入和输出。

def prophet_main(data):true_time = pd.datetime.now().strftime('%Y-%m-%d')data.dropna(inplace=True)data['ds'] = pd.to_datetime(data['ds'])data = data[data['ds'] < true_time]data['ds'] = data['ds'].astype(str)data['ds'] = pd.to_datetime(data['ds'])# 异常值替换data = replace_fill(data)pro_back = prophet_train(data)return pro_back

4.读取hive数据,调用spark进行prophet模型预测

schema = StructType([StructField("store_sku", StringType()),StructField("ds", StringType()),StructField("pro_pred", DoubleType())
])@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def run_model(data):data['store_sku']=data['store_sku'].astype(str)df = prophet_main(data)uuid = data['store_sku'].iloc[0]df['store_sku']=uniddf['ds']=df['ds'].astype(str)df['pro_pred']=df['pro_pred'].astype(float)cols=['store_sku','ds','pro_pred']return df[cols]

假设我们希望输出的结果为三列,分别是store_sku,ds,pro_pred,则定义它们的数据类型,定义的数据类型和顺序要和放入的数据类型一致,然后通过@pandas_udf进行装饰,PandasUDFType有两种类型一种是Scalar(标量映射),另一种是Grouped Map(分组映射).我们显然是要使用分组映射,通过store_sku作为id进行分组,从而实现split-apply-combine

以上是纯python内容,下面展示通过hive数据库读取和运行python并把结果写入hive中。

data = spark.sql("""select concat(store_code,'_',goods_code) as store_sku,qty_fix as y,dsfrom scmtemp.redsku_store_sku_sale_fix_d""")
data.createOrReplaceTempView('data')
sale_predict = data.groupby(['store_sku']).apply(run_model)
sale_predict.createOrReplaceTempView('test_read_data')
# 保存到数据库
spark.sql(f"drop table if exists scmtemp.store_sku_sale_prophet")
spark.sql(f"create table scmtemp.store_sku_sale_prophet as select * from store_sku_predict_29 ")
print('完成预测')

当然也可以不用pandas_udf的形式进行
,在旧版spark中使用sc.parallelize()实现分组并行化
如:sc.parallelize(data,800).map(run_model).reduce(merge)

上文还有一个节假日数据没有给出来,限于篇幅有限,整个代码就放在github上了,如需要请自取。

基本交代清楚了,暂更于此。

完整代码[pyspark_prophet]

PySpark-prophet预测相关推荐

  1. 用Prophet预测USDCNY走势--------仿照forecasting-stock-perfomance-with-prophet对美元人民币走势进行预测

    时间序列预测一直是一项挺复杂且结果挺玄学的领域.一次偶然的机会,我在微信上看到一篇用Prophet对股票表现进行预测的文章,于是对这款不用提取特征的简易时序预测工具产生了好奇,想着仿照该文章思路对Pr ...

  2. 采用 facebook 的prophet 预测科大讯飞的股票开盘价

    在运行代码前需要安装fbprophet和tushare,然后拷贝代码可直接运行,个人认为facebook的这个开源不怎么样,回测的效果不行.不如lstm甚至简单的多元回归. #!/usr/bin/en ...

  3. linux python prophet,在python中使用Prophet预测每个类别的值

    我对用Python和Prophet做时间序列非常陌生.我有一个带有变量的数据集文章代码,日期和销售数量.我试图用python中的Prophet来预测每个月每件商品的销售量.dataset"/ ...

  4. 独家 | 手把手教你用Python的Prophet库进行时间序列预测

    作者:Jason Brownlee 翻译:殷之涵 校对:吴振东 本文长度为4800字,建议阅读10+分钟 本文为大家介绍了如何在Python中使用由Facebook开发的Prophet库进行自动化的时 ...

  5. 手把手教你用Prophet快速进行时间序列预测(附Prophet和R代码)

    作者:ANKIT CHOUDHARY 翻译:王雨桐 校对:丁楠雅 本文约3000字,建议阅读12分钟. 本文将通过拆解Prophet的原理及代码实例来讲解如何运用Prophet进行时间序列预测. 简介 ...

  6. porphet论文_Facebook 时间序列预测算法 Prophet 的研究

    Prophet 简介 Facebook 去年开源了一个时间序列预测的算法,叫做 fbprophet,它的官方网址与基本介绍来自于以下几个网站: 从官网的介绍来看,Facebook 所提供的 proph ...

  7. 基于Facebook开发的Prophet项目预测铁路货运量(实例)

    好久没有写东西了,今天写个之前做过的预测工作练手. 前情提要:Prophet项目是Facebook开发的一个时间序列的预测算法.算法使用起来简单,且对有明显内在规律的商业行为数据很有效, 因此常常被用 ...

  8. python 时间序列prophet 模型分析_手把手教你用Prophet快速进行时间序列预测(附Prophet和R代码)...

    原标题:手把手教你用Prophet快速进行时间序列预测(附Prophet和R代码) 作者:ANKIT CHOUDHARY:翻译:王雨桐:校对:丁楠雅: 本文约3000字,建议阅读12分钟. 本文将通过 ...

  9. python 时间序列prophet 模型分析_如何评价facebook开源的prophet时间序列预测工具?...

    近期整理了一下 Facebook 的 Prophet,个人感觉这是一个非常不错的时间序列预测工具. Prophet 简介 Facebook 去年开源了一个时间序列预测的算法,叫做 fbprophet, ...

最新文章

  1. nlohmann/json使用笔记
  2. 更改计算机名引起的奇怪问题:“重新启动计算机之前控制台无法刷新”
  3. bootstrap 开源框架demo_高大上的开源Springboot企业级用户权限系统
  4. 计算机辅助抗体设计,计算机辅助设计提高单克隆抗体亲和力的研究
  5. 首次运行 tensorflow 项目之 vgg 网络
  6. 挖矿赚加密货币?不如出租GPU计算机获取加倍的利润
  7. Java开发笔记(一百三十一)Swing的列表框
  8. CHI2019 ChewIt. An Intraoral Interface for Discreet Interactions
  9. mysql adodb_指南从MySQL转向ADODB的方法_MySQL
  10. deepin显卡驱动管理器在哪_deepin显卡设置
  11. 【python练习笔记】神秘的王宫
  12. DW_axi_dmac控制器(术语)
  13. 可以几分钟快速对接支付宝APP支付和手机网站支付?
  14. 转账到支付宝账户接口常见问题
  15. 标准机构发布物联网安全测试指南
  16. 数电快速入门(二)(复合逻辑运算和逻辑代数的基本定律的介绍)
  17. 将本地文件夹添加到 Git 仓库
  18. 在线招聘软件市场深度分析及发展研究预测报告
  19. 当程序员们决定去考公
  20. 通知计算机大赛的英语作文,奥林匹克英语作文大赛范文.doc

热门文章

  1. 中国再生金属行业运行现状与发展趋势研究报告2022版
  2. 腾讯视频再次宣布涨价
  3. html当中如何引用js文件
  4. ARMCortex系列主流的仿真调试器
  5. 让Windows加倍好看
  6. promise异步编程 详解
  7. Arch Linux 的安装配置
  8. Python爬虫第五课:存储数据
  9. python如何创建三维数组_python – numpy中的三维数组
  10. 【前端知识之JS】JS本地存储