spark streaming 滑动窗口

spark streaming 滑动窗口滑动窗口DStream.window(windowlength,slidinginterval) batchinterval:批处理时间间隔,sparkstreaming将消息源(Kafka)的数据,以流的方式按批处理时间间隔切片,一个批处理间隔时间对应1个切片对应生成的1个RDDwindowlength:窗口时间长度,每个批处理间隔将会实际处理的RDD个数(1…n…

大家好,又见面了,我是你们的朋友全栈君。

滑动窗口

DStream.window(window length,sliding interval)

 

batch interval:批处理时间间隔,spark streaming将消息源(Kafka)的数据,以流的方式按批处理时间间隔切片,一个批处理间隔时间对应1个切片对应生成的1个RDD

window length :窗口时间长度,每个批处理间隔将会实际处理的RDD个数(1…n)。是批处理间隔的N(N>=1)倍。

sliding interval:滑动窗口时间长度,窗口操作执行的时间间隔。如果设置为=batch interval,则每个批处理时间间隔都会执行一次窗口操作,如果设置为=N*processingInterval(N>1,N为Int),则每N个批处理时间间隔会执行一次窗口操作。

 

假设spark streaming 从kafka的largest 偏移量处开始消费

对于一个新的消费者:

每隔一次batch interval,会更新一次offset(拉取的数据为该batch interval内进入kafka的实时数据)

每隔一次sliding interval,会进行生成windowed DStream 操作,并执行逻辑,最后更新一次offset。其中生成的 windowed DStream的数据源为当前最后  window length对应的N个RDD的和(N>=sliding interval,且N=n*batch interval)。

     

对于一个旧的消费者:

每隔一次batch interval,会更新一次offset(拉取的数据为该batch interval内进入kafka的实时数据+之前保存的offset位置到当前位置的历史数据)

每隔一次sliding interval,会进行生成windowed DStream 操作,并执行逻辑,最后更新一次offset。其中生成的 windowed DStream的数据源为当前最后  window length包含的N个RDD的和(N>=sliding interval,且N=n*batch interval)。

                  

1.如果,window length=3Min,sliding interval=1Min,batch interval=1Min,假设spark streaming 从kafka的largest 偏移量处开始消费。

上述语义为:每隔1分钟,将当前最后3分钟的数据生成一个windowed DStream(如果有多个RDD,则合并他们)

spark streaming 滑动窗口

在第一个分钟里,会从kafka里面拉取新进入kafka里的第一分钟的数据并封装为RDD存储到内存,并拉取当前最后1分钟的数据生成一个windowed DStream执行print等action操作(为什么是当前最后1分钟?因为当前只有1分钟的数据)

两分钟过去后,会从kafka里面拉取新进入kafka里的第2分钟的数据并封装为RDD存储到内存,并拉取当前最后2分钟的数据生成一个windowed DStream执行print等action操作

3分钟过去后,会从kafka里面拉取新进入kafka里的第3分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

4分钟过去后,会从kafka里面拉取新进入kafka里的第4分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

5分钟过去后,会从kafka里面拉取新进入kafka里的第5分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

….

2. 如果,window length=3Min,sliding interval=2Min,batch interval=1Min,假设spark streaming 从kafka的largest 偏移量处开始消费。

上述语义为:每隔2分钟,将当前最后3分钟的数据生成一个windowed DStream(如果有多个RDD,则合并他们)

spark streaming 滑动窗口

 

在14个batch interval 里会执行7次窗口数据处理,除了第一个窗口长度为2个batch interval以外,其他都为3个batch interval。

在第一个分钟里,会从kafka里面拉取新进入kafka里的第一分钟的数据并封装为RDD存储到内存

两分钟过去后,会从kafka里面拉取新进入kafka里的第2分钟的数据并封装为RDD存储到内存,执行print等action操作,这次会执行2个RDD里面的数据。

3分钟过去后,会从kafka里面拉取新进入kafka里的第3分钟的数据并封装为RDD存储到内存,不会执行print等action操作

4分钟过去后,会从kafka里面拉取新进入kafka里的第4分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

5分钟过去后,会从kafka里面拉取新进入kafka里的第5分钟的数据并封装为RDD存储到内存,不会执行print等action操作

6分钟过去后,会从kafka里面拉取新进入kafka里的第6分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

7分钟过去后,会从kafka里面拉取新进入kafka里的第7分钟的数据并封装为RDD存储到内存,不会执行print等action操作

…..

在实际应用中:window length – sliding interval >=应用中给定的需要统计的累计最大时长,这样才不会因为当前窗口遗漏某些特殊时间段的数据。

如有这样一个逻辑:要求判断连续30分钟的数据满足条件A,则得出结果B

spark streaming 滑动窗口

如果,让window length=30,sliding interval=10,batch interval=10,即window length – sliding interval < 30分钟

那么 从第5分钟开始连续,直到35分钟时结束连续的这段数据,将不能正常得到结果B

如果,让window length=40,sliding interval=10,batch interval=10,即window length – sliding interval =30分钟

那么 从第5分钟开始连续,直到35分钟时结束连续的这段数据,将可以正常得到结果B

 

觉得还行的话,右上角点个赞哟。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/149657.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • UE4 slate_ue4渲染动画

    UE4 slate_ue4渲染动画原创文章,转载请注明出处。点击观看下一篇《UE4Slate二用UMG思想去理解Slate》这里写目录标题前言Slate是什么?为什么要了解Slate?提前需要了解的模块(将对Slate学习很有帮助)前言Slate相关文章,使用引擎版本4.25.4源码版.一共11篇文章。网上Slate的文章介绍很少,所以在这里做一下该部分的文章并记录一下。同时也希望能帮助到大家。Slate是什么?一套GUI架构,它是虚幻引擎的UI架构。我们打开编辑器看到的界面,可以说95%以上(高概猜测数值,甚至100.

    2022年10月4日
    0
  • 两个正序数组 找中位数_leetcode合并两个有序数组

    两个正序数组 找中位数_leetcode合并两个有序数组原题连接给定两个大小分别为 m 和 n 的正序(从小到大)数组 nums1 和 nums2。请你找出并返回这两个正序数组的 中位数 。示例 1:输入:nums1 = [1,3], nums2 = [2]输出:2.00000解释:合并数组 = [1,2,3] ,中位数 2示例 2:输入:nums1 = [1,2], nums2 = [3,4]输出:2.50000解释:合并数组 = [1,2,3,4] ,中位数 (2 + 3) / 2 = 2.5示例 3:输入:nums1 = [0,

    2022年8月9日
    1
  • USES_CONVERSION宏定义

    USES_CONVERSION宏定义USES_CONVERSION是用来转换类型的(比如T2A等转换需用此宏),比如我们很常见的问题:在Socket编程时候,我们的IP地址从界面上输进去一般都使用CString类型的,可是在SOCKADDR_IN中的inet_addr却是const char *我们就不能直接用CString来用。我们就可以使用T2A()宏了。 SOCKADDR_IN localaddr; …

    2022年8月18日
    5
  • linux命令mysql启动,linux下启动mysql的命令

    linux命令mysql启动,linux下启动mysql的命令linux下启动mysql的命令一、总结一下:1.linux下启动mysql的命令:mysqladminstart/ect/init.d/mysqlstart(前面为mysql的安装路径)2.linux下重启mysql的命令:mysqladminrestart/ect/init.d/mysqlrestart(前面为mysql的安装路径)3.linux下关闭mysql的命令:mysqla…

    2022年5月21日
    37
  • generic host process已停止工作_windows error reporting 1001

    generic host process已停止工作_windows error reporting 1001故障现象:今天在虚拟机里装了win2003系统,每次重启进入系统时都会报错:generichostprocessforwin32services遇到了一个问题需要关闭。解决方法:先从google查了下相关问题,觉得没一个说来符合我的实际情况。于是回头仔细查看日志,怀疑是安装文件太旧引起的。于是更新补丁,当安装完了提示的99个补丁后,再重启进入系统,…

    2022年10月11日
    1
  • 编程入门先学什么 java_编程入门先学什么?java的快速学习方法

    编程入门先学什么 java_编程入门先学什么?java的快速学习方法如何快速的入门java?下面让达内广州java培训的小编分享一些干货给大家吧!一、掌握静态方法和属性静态方法和属性用于描述某一类对象群体的特征,而不是单个对象的特征。Java中大量应用了静态方法和属性,这是一个通常的技巧。但是这种技巧在很多语言中不被频繁地使用。理解静态方法和属性对于理解类与对象的关系是十分有帮助的,在大量的Java规范中,静态方法和属性被频繁使用。因此学习者应该理解静态方法和属性…

    2022年5月3日
    39

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号