PySpark-prophet预测

PySpark-prophet预测简介Prophet是facebook开源时间序列预测工具,使用时间序列分解与机器学习拟合的方法进行建模预测。关于prophet模型优点我不打算说,网络上的文章非常多,各种可视化,和参数的解释与demo演示。但是在正在用到工业上大规模的可供学习的中文材料并不多。本文打算使用pyspark进行多序列预测建模,会给出一个比较详细的脚本,供交流学习,重点在于使用hive数据/分布式,以及中间数据预处理,以及pandas_udf对多条序列进行循环执行。背景说明,在十万级别的sku序列上使用prophet预测每

大家好,又见面了,我是你们的朋友全栈君。

简介

Prophet是facebook开源的时间序列预测工具,使用时间序列分解与机器学习拟合的方法进行建模预测
,关于prophet模型优点本文不再累述,网络上的文章也比较多了,各种可视化,参数的解释与demo演示,但是真正用到工业上大规模的可供学习的中文材料并不多。

本文打算使用PySpark进行多序列预测建模,会给出一个比较详细的脚本,供交流学习,重点在于使用hive数据/分布式,数据预处理,以及pandas_udf对多条序列进行循环执行。
tips:背景说明,在十万级别的sku序列上使用prophet预测每个序列未来七天的销售。


1.导入库和初始化设置

Pandas Udf 构建在 Apache Arrow 之上,因此具有低开销,高性能的特点,udf对每条记录都会操作一次,数据在 JVM 和 Python 中传输,pandas_udf就是使用 Java 和 Scala 中定义 UDF,然后在 python 中调用。

#导入库
import datetime
from dateutil.relativedelta import relativedelta
from fbprophet import Prophet
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

#初始化
spark = SparkSession. \
    Builder(). \
    config("spark.sql.execution.arrow.enabled", "true"). \
    enableHiveSupport(). \
    getOrCreate()

其中初始化config:开启spark df与pandas df 相互转化的性能优化配置.


2.数据预处理


def sale_ds(df):
    df['ds'] = pd.to_datetime(df['ds'])
    df = df[['store_sku', 'ds', 'y']]
    # 控制长度,周期不用太长,关注最近的几个完整周期即可
    start_day = (
            df['ds'].max() -
            relativedelta(
                days=63)).strftime('%Y-%m-%d')
    df = df[df['ds'] >= start_day][['store_sku', 'ds', 'y']]
    # 筛选条件:1 序列长度大于等于14,且过去最少有七天的销售记录;
    # 条件1,保障模型有两个完整的周期数据;
    # 条件2,避免出现0,0,0,0,0,0,1,0,1这样数据稀疏的数据出现
    sale_set = df.groupby(
        ['store_sku']).filter(
        lambda x: len(x) >= 14 and np.sum(
            x['y']) > 7)
    return sale_set


def replace_fill(data):
    """ 先尝试使用上周的数据填补,再针对极端的数据进行cap,保障序列的完整和平滑性 :param data:单个序列 :param name: 序列名称,store_sku :return: 修复后的一条序列 """
    data['ds'] = pd.to_datetime(data['ds'], format='%Y-%m-%d')
    data['y'] = data['y'].astype(float)
    data.loc[data['y'] <= 0, 'y'] = np.NaN
    data.loc[data['y'].isnull(), 'y'] = data['y'].shift(7).values[0]
    data.loc[data['y'].isnull(), 'y'] = data['y'].shift(-7).values[0]
    data.loc[data['y'].isnull(), 'y'] = data['y'].shift(-14).values[0]
    data.loc[data['y'].isnull(), 'y'] = data['y'].shift(14).values[0]
    data.loc[data['y'].isnull(), 'y'] = data['y'].interpolate(methon='nearest', order=3)
    low = data[data['y'] > 0]['y'].quantile(0.10)
    high = data[data['y'] > 0]['y'].quantile(0.90)
    data.loc[data['y'] < low, 'y'] = np.NaN
    data.loc[data['y'] > high, 'y'] = np.NaN
    data['y'] = data['y'].fillna(data['y'].mean())
    data['y'] = np.log1p(data['y'])
    return data

以上为数据预处理,具体内容见注释.

放入模型中的时间和y值名称必须是ds和y,首先控制数据的周期长度,如果预测天这种粒度的任务,则使用最近的4-6周即可。

因为是放入了长度不一的多个序列,为了让预测更加可靠,对序列的长度有一定的限定,比如,序列长度至少有14天,还要一个需要注意的问题是,如果出现0,0,0,0,0,0,1,0,1这样数据稀疏的数据的时候,prophet会报错,报错内容大致为,std太低,反推回去就是放入的数据类似于常量,模型无法拟合。

至于缺失值的填充,prophet可以设置y为nan,模型在拟合过程中也会自动填充一个预测值,因为我们预测的为sku销量,是具有星期这种周期性的,所以如果出现某一天的缺失,我们倾向于使用最近几周同期数据进行填充,没有优先使用均值或众数进行填充,是因为,均值和众数会掩盖序列的周期性,破坏整个序列的规律,为了进一步对数据进行平滑,对于异常值还进行了分位数盖帽,因为时序数据往往是偏态分布,所以我们对原始值做了取对数处理。

以上的数据预处理比较简单,其中多数可以使用hive进行操作,会更加高效,这里放出来的目的是演示一种思路以及python函数和最后的pandas_udf交互。


3.建模

def prophet_train(data):
    model = Prophet(
        daily_seasonality=False,
        yearly_seasonality=False,
        holidays=holiday_df,
        holidays_prior_scale=10)
    model.add_seasonality(
        name='weekly',
        period=7,
        fourier_order=3,
        prior_scale=0.10)
    model.fit(data)
    future = model.make_future_dataframe(periods=7, freq='d')
    forecast = model.predict(future)
    forecast['pro_pred'] = np.expm1(forecast['yhat'])
    forecast_df=forecast[['store_sku','ds','pro_pred']]
    # 对预测值修正
    forecast_df.loc[forecast_df['pro_pred'] < 0, 'pro_pred'] = 0
    low = (1 + 0.1) * data['y'].min()
    hight = min((1 + 0.05) * data['y'].max(), 10000)
    forecast_df.loc[forecast_df['pro_pred'] < low, 'pro_pred'] = low
    forecast_df.loc[forecast_df['pro_pred'] > hight, 'pro_pred'] = hight
    return forecast_df

以上参数设置详见https://zhuanlan.zhihu.com/p/52330017

函数内部的holiday_df是假日数据,数据格式需要按照文档要求进行定义,改函数部分也会和整个代码一起放在github,如果序列中最近呈现出较大的下滑或者增长,那么预测值很容易得到负数或者非常大,这个时候我们依然需要对预测值进行修正,而非完全交给模型,当然你也可以在放入数据中设置上下限。

data['cap'] = 1000  #上限
data['floor'] = 6  #下限

该函数把前面的数据预处理函数和模型训练函数放在一个函数中,类似于主函数,目的是使用统一的输入和输出。

def prophet_main(data):
    true_time = pd.datetime.now().strftime('%Y-%m-%d')
    data.dropna(inplace=True)
    data['ds'] = pd.to_datetime(data['ds'])
    data = data[data['ds'] < true_time]
    data['ds'] = data['ds'].astype(str)
    data['ds'] = pd.to_datetime(data['ds'])
    # 异常值替换
    data = replace_fill(data)
    pro_back = prophet_train(data)
    return pro_back

4.读取hive数据,调用spark进行prophet模型预测

schema = StructType([
    StructField("store_sku", StringType()),
    StructField("ds", StringType()),
    StructField("pro_pred", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def run_model(data):
    data['store_sku']=data['store_sku'].astype(str)
    df = prophet_main(data)
    uuid = data['store_sku'].iloc[0]
    df['store_sku']=unid
    df['ds']=df['ds'].astype(str)
    df['pro_pred']=df['pro_pred'].astype(float)
    cols=['store_sku','ds','pro_pred']
    return df[cols]

假设我们希望输出的结果为三列,分别是store_sku,ds,pro_pred,则定义它们的数据类型,定义的数据类型和顺序要和放入的数据类型一致,然后通过@pandas_udf进行装饰,PandasUDFType有两种类型一种是Scalar(标量映射),另一种是Grouped Map(分组映射).我们显然是要使用分组映射,通过store_sku作为id进行分组,从而实现split-apply-combine

以上是纯python内容,下面展示通过hive数据库读取和运行python并把结果写入hive中。

data = spark.sql(
    """ select concat(store_code,'_',goods_code) as store_sku,qty_fix as y,ds from scmtemp.redsku_store_sku_sale_fix_d""")
data.createOrReplaceTempView('data')
sale_predict = data.groupby(['store_sku']).apply(run_model)
sale_predict.createOrReplaceTempView('test_read_data')
# 保存到数据库
spark.sql(f"drop table if exists scmtemp.store_sku_sale_prophet")
spark.sql(f"create table scmtemp.store_sku_sale_prophet as select * from store_sku_predict_29 ")
print('完成预测')

当然也可以不用pandas_udf的形式进行
,在旧版spark中使用sc.parallelize()实现分组并行化
如:sc.parallelize(data,800).map(run_model).reduce(merge)

上文还有一个节假日数据没有给出来,限于篇幅有限,整个代码就放在github上了,如需要请自取。

基本交代清楚了,暂更于此。

完整代码[pyspark_prophet]

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/151737.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • MongoDB(三)——CRUD

    MongoDB(三)——CRUD

    2022年1月11日
    41
  • 记录Depix工具的使用

    记录Depix工具的使用这个月初有一个挺让人振奋的消息,说是出了一款开源去“马赛克“工具,三天就收获了3k+star,现在star数已经达到了13.7k了,项目地址:https://github.com/beurtschipper/Depix。我是没有这种世俗的欲望的,所以当时就没关注,直到昨天”纵横杯”网络安全竞赛Misc中专门出了一个马赛克的题,就给出了下面这张图片师傅们调侃说近视眼摘下眼镜离远一点就能看清楚了,我试了一下只能看出轮廓,但是还是看不清,这就需要用到Depix这个神奇的工具了。我这里是在windows系

    2022年6月29日
    116
  • ios消息推送机制_iPhone消息推送

    ios消息推送机制_iPhone消息推送原文地址:http://www.apkbus.com/android-130195-1-5.html推送是解决轮询所造成的流量消耗和电量消耗的一个比较好的解决方案,在Android上,虽然Google提供了GCM(之前为C2DM),但在国内基本等于没用,各大Android应用基本都自己架设推送Server或是使用第三方推送平台,例如新浪微博使用第三方推送平台“个推”(非广告 )。今天要

    2022年9月28日
    1
  • 那些惊艳的算法们(三)—— 时间轮[通俗易懂]

    那些惊艳的算法们(三)—— 时间轮[通俗易懂]同步发表于:http://blog.lanjingdejia.com/articles/2018/08/13/1534132662997.html从定时任务说起自然界中定时任务无处不在,太阳每天东升西落,候鸟的迁徙,树木的年轮,人们每天按时上班,每个月按时发工资、交房租,四季轮换,潮涨潮落,等等,从某种意义上说,都可以认为是定时任务。大概很少有人想过,这些“定时”是怎样做到的。当然,计算机…

    2022年10月1日
    0
  • c#开发微信公众平台_小程序开发教程

    c#开发微信公众平台_小程序开发教程本文转自http://www.wuling365.com/Article/View.aspx?Id=30  想学习微信开发技术请加入我们!郴州微信开发QQ群:587978628  最近在开发“郴州微信营销”系统,网络上涉及微信开发的代码99%都是PHP写的,由于本人不想再学习PHP,于是决定用C#开发。现将开发过程及一些实现细节记录下来,供大家参考。由于本人能力有限,错误之处在所难免,欢

    2022年8月20日
    6
  • 中文写代码?开始不信后来用中文写了剧情小游戏!嗯,真香~

    中文写代码?开始不信后来用中文写了剧情小游戏!嗯,真香~你还不知道可以用中文编写脚本制作游戏?那还不赶紧点进来看看~

    2022年6月16日
    36

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号