spark streaming 滑动窗口

spark streaming 滑动窗口滑动窗口DStream.window(windowlength,slidinginterval) batchinterval:批处理时间间隔,sparkstreaming将消息源(Kafka)的数据,以流的方式按批处理时间间隔切片,一个批处理间隔时间对应1个切片对应生成的1个RDDwindowlength:窗口时间长度,每个批处理间隔将会实际处理的RDD个数(1…n…

大家好,又见面了,我是你们的朋友全栈君。

滑动窗口

DStream.window(window length,sliding interval)

 

batch interval:批处理时间间隔,spark streaming将消息源(Kafka)的数据,以流的方式按批处理时间间隔切片,一个批处理间隔时间对应1个切片对应生成的1个RDD

window length :窗口时间长度,每个批处理间隔将会实际处理的RDD个数(1…n)。是批处理间隔的N(N>=1)倍。

sliding interval:滑动窗口时间长度,窗口操作执行的时间间隔。如果设置为=batch interval,则每个批处理时间间隔都会执行一次窗口操作,如果设置为=N*processingInterval(N>1,N为Int),则每N个批处理时间间隔会执行一次窗口操作。

 

假设spark streaming 从kafka的largest 偏移量处开始消费

对于一个新的消费者:

每隔一次batch interval,会更新一次offset(拉取的数据为该batch interval内进入kafka的实时数据)

每隔一次sliding interval,会进行生成windowed DStream 操作,并执行逻辑,最后更新一次offset。其中生成的 windowed DStream的数据源为当前最后  window length对应的N个RDD的和(N>=sliding interval,且N=n*batch interval)。

     

对于一个旧的消费者:

每隔一次batch interval,会更新一次offset(拉取的数据为该batch interval内进入kafka的实时数据+之前保存的offset位置到当前位置的历史数据)

每隔一次sliding interval,会进行生成windowed DStream 操作,并执行逻辑,最后更新一次offset。其中生成的 windowed DStream的数据源为当前最后  window length包含的N个RDD的和(N>=sliding interval,且N=n*batch interval)。

                  

1.如果,window length=3Min,sliding interval=1Min,batch interval=1Min,假设spark streaming 从kafka的largest 偏移量处开始消费。

上述语义为:每隔1分钟,将当前最后3分钟的数据生成一个windowed DStream(如果有多个RDD,则合并他们)

spark streaming 滑动窗口

在第一个分钟里,会从kafka里面拉取新进入kafka里的第一分钟的数据并封装为RDD存储到内存,并拉取当前最后1分钟的数据生成一个windowed DStream执行print等action操作(为什么是当前最后1分钟?因为当前只有1分钟的数据)

两分钟过去后,会从kafka里面拉取新进入kafka里的第2分钟的数据并封装为RDD存储到内存,并拉取当前最后2分钟的数据生成一个windowed DStream执行print等action操作

3分钟过去后,会从kafka里面拉取新进入kafka里的第3分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

4分钟过去后,会从kafka里面拉取新进入kafka里的第4分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

5分钟过去后,会从kafka里面拉取新进入kafka里的第5分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

….

2. 如果,window length=3Min,sliding interval=2Min,batch interval=1Min,假设spark streaming 从kafka的largest 偏移量处开始消费。

上述语义为:每隔2分钟,将当前最后3分钟的数据生成一个windowed DStream(如果有多个RDD,则合并他们)

spark streaming 滑动窗口

 

在14个batch interval 里会执行7次窗口数据处理,除了第一个窗口长度为2个batch interval以外,其他都为3个batch interval。

在第一个分钟里,会从kafka里面拉取新进入kafka里的第一分钟的数据并封装为RDD存储到内存

两分钟过去后,会从kafka里面拉取新进入kafka里的第2分钟的数据并封装为RDD存储到内存,执行print等action操作,这次会执行2个RDD里面的数据。

3分钟过去后,会从kafka里面拉取新进入kafka里的第3分钟的数据并封装为RDD存储到内存,不会执行print等action操作

4分钟过去后,会从kafka里面拉取新进入kafka里的第4分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

5分钟过去后,会从kafka里面拉取新进入kafka里的第5分钟的数据并封装为RDD存储到内存,不会执行print等action操作

6分钟过去后,会从kafka里面拉取新进入kafka里的第6分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

7分钟过去后,会从kafka里面拉取新进入kafka里的第7分钟的数据并封装为RDD存储到内存,不会执行print等action操作

…..

在实际应用中:window length – sliding interval >=应用中给定的需要统计的累计最大时长,这样才不会因为当前窗口遗漏某些特殊时间段的数据。

如有这样一个逻辑:要求判断连续30分钟的数据满足条件A,则得出结果B

spark streaming 滑动窗口

如果,让window length=30,sliding interval=10,batch interval=10,即window length – sliding interval < 30分钟

那么 从第5分钟开始连续,直到35分钟时结束连续的这段数据,将不能正常得到结果B

如果,让window length=40,sliding interval=10,batch interval=10,即window length – sliding interval =30分钟

那么 从第5分钟开始连续,直到35分钟时结束连续的这段数据,将可以正常得到结果B

 

觉得还行的话,右上角点个赞哟。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/149657.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • pycharm安装后运行不了_pycharm上无法安装各种库

    pycharm安装后运行不了_pycharm上无法安装各种库原博客链接:http://blog.csdn.net/qingyuanluofeng/article/details/46501427问题:pycharm安装后不能执行python脚本。我的是执行后老是报错,但是之前在cpython中都是可以的。于是上网查询解决方法原因:pycharm没有设置解析器/解释器设置错误(我的就是因为这个之前设置错了,位置也是错的,结果导致程序不能正常运行出

    2022年8月26日
    4
  • 风控模型评估

    风控模型评估  本文总结了一下评分卡建模过程中常用的模型评估方法,并结合代码展示,理论结合实际,方便初学者对模型评估的方法有深入的理解。之前写过一篇模型评估的指标,偏于理论,详情见风控模型指标详解。1.AUC  AUC值指的是ROC曲线下面积大小,该值能够量化反映基于ROC曲线衡量的模型性能。所以,需要了解ROC曲线的绘制方法。  首先,需要了解TPR(真阳性率)和FPR(假阳性率)。TPR就是P个正…

    2022年4月30日
    146
  • 数据库查询常用语句语法

    数据库查询常用语句语法selectxxfrom表格where内容=””检查一个项目是否在列表中可以用in列表,用(‘’,’’,’‘)单引号分隔开名字不在里面namenotin(‘’,‘‘,‘‘)名字在里面namein(‘’,‘‘,‘‘)andorname=’Argentina’orname=’Australia’等同于nameinIN(‘Argentina’,’Australia’)betweenxxandyy…

    2022年4月30日
    39
  • 什么叫侧面指纹识别_侧面指纹识别方案还有哪些问题没有解决?

    什么叫侧面指纹识别_侧面指纹识别方案还有哪些问题没有解决?当我们回首手机圈不难发现两个关键词——金属机身以及指纹识别。虽然指纹识别功能并不是直到今年才出现在智能手机之上,在这一年中,各大手机厂商先是采用了与iPhone相同的正面指纹识别方案,然而没过多久便有厂商提出,指纹识别功能放在机身背面更好用(成本低才是真相);一时间,关于“内裤到底该正着穿,还是反着穿?”的争论,成为各大手机发布会必谈的话题之一。直到侧面指纹识别方案的出现,广大的手机用户才发现,原…

    2022年8月10日
    4
  • MongoDB(四)—-MongoDB的文档操作

    MongoDB(四)—-MongoDB的文档操作

    2020年11月12日
    166
  • 平衡二叉树与红黑树的区别_平衡二叉树怎么构造

    平衡二叉树与红黑树的区别_平衡二叉树怎么构造平衡二叉树与红黑树一、红黑树的性质:二、红黑树的主要用途,和其他树的比较:三、运用场景一、红黑树的性质:  红黑树是一颗二叉搜索树,通过对任何一条从根到叶子的简单路径上各个结点的颜色进行约束,红黑树确保没有一条路径会比其他路径长出2倍,因而是近似于平衡的。  树的每个结点包含5个属性,color,key,left,right,p。如果一个结点没有子结点或父结点,则该结点的响应指针属性的指为…

    2022年10月21日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号