Spark Streaming详解(重点窗口计算)

Spark Streaming详解(重点窗口计算)前面有几篇关于SparkStreaming的博客,那会只是作为Spark入门,快速体验Spark之用,只是照着葫芦画瓢。本文结合Spark官网上SparkStreaming的编程指南对SparkStreaming进行介绍StreamingContext如同SparkContext一样,StreamingContext也是SparkStreaming应用程序通往Spark集群的通道,它的定义…

大家好,又见面了,我是你们的朋友全栈君。

StreamingContext

如同SparkContext一样,StreamingContext也是Spark Streaming应用程序通往Spark集群的通道,它的定义如下:

Java代码  
收藏代码

  1. /** 
  2.  * Main entry point for Spark Streaming functionality. It provides methods used to create 
  3.  * [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either 
  4.  * created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf 
  5.  * configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext. 
  6.  * The associated SparkContext can be accessed using `context.sparkContext`. After 
  7.  * creating and transforming DStreams, the streaming computation can be started and stopped 
  8.  * using `context.start()` and `context.stop()`, respectively. 
  9.  * `context.awaitTermination()` allows the current thread to wait for the termination 
  10.  * of the context by `stop()` or by an exception. 
  11.  */  
  12. class StreamingContext private[streaming] (  
  13.     sc_ : SparkContext,  
  14.     cp_ : Checkpoint,  
  15.     batchDur_ : Duration  
  16.   ) extends Logging {  

 通过类的文档注释,我们看到:

1. 提供了从各种输入数据源创建DStream的方法

2,参数中的batchDur_是Duration类型的对象,比如Second(10),这个参数的含义是the time interval at which streaming data will be divided into batches,也就是说,假如batchDur_为Second(10)表示Spark Streaming会把每10秒钟的数据作为一个Batch,而一个Batch就是一个RDD?是的,一个RDD的数据对应一个batchInterval累加读取到的数据

 

DStream

Java代码  
收藏代码

  1. /** 
  2.  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous 
  3.  * sequence of RDDs (of the same type) representing a continuous stream of data (see 
  4.  * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). 
  5.  * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, 
  6.  * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by 
  7.  * transforming existing DStreams using operations such as `map`, 
  8.  * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream 
  9.  * periodically generates a RDD, either from live data or by transforming the RDD generated by a 
  10.  * parent DStream. 
  11.  * 
  12.  * This class contains the basic operations available on all DStreams, such as `map`, `filter` and 
  13.  * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains 
  14.  * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and 
  15.  * `join`. These operations are automatically available on any DStream of pairs 
  16.  * (e.g., DStream[(Int, Int)] through implicit conversions when 
  17.  * `org.apache.spark.streaming.StreamingContext._` is imported. 
  18.  * 
  19.  * DStreams internally is characterized by a few basic properties: 
  20.  *  – A list of other DStreams that the DStream depends on 
  21.  *  – A time interval at which the DStream generates an RDD 
  22.  *  – A function that is used to generate an RDD after each time interval 
  23.  */  

 从文档中,我们可以看到如下几点:

1. 对DStream实施map操作,会转换成另外一个DStream

2. DStream是一组连续的RDD序列,这些RDD中的元素的类型是一样的。DStream是一个时间上连续接收数据但是接受到的数据按照指定的时间(batchInterval)间隔切片,每个batchInterval都会构造一个RDD,因此,Spark Streaming实质上是根据batchInterval切分出来的RDD串,想象成糖葫芦,每个山楂就是一个batchInterval形成的RDD

3. 对DStream实施windows或者reduceByKeyAndWindow操作,也是转换成另外一个DStream(window操作是stateful DStream Transformation)

4. DStream同RDD一样,也定义了map,filter,window等操作,同时,对于元素类型为(K,V)的pair DStream,Spark Streaming提供了一个隐式转换的类,PairStreamFunctions

5. DStream内部有如下三个特性:

-DStream也有依赖关系,一个DStream可能依赖于其它的DStream(依赖关系的产生,同RDD是一样的)

-DStream创建RDD的时间间隔,这个时间间隔是不是就是构造StreamingContext传入的第三个参数?是的!

-在时间间隔到达后,DStream创建RDD的方法

Spark Streaming详解(重点窗口计算)

 在DStream内部,DStream表现为一系列的RDD的序列,针对DStream的操作(比如map,filter)会转换到它底层的RDD的操 作,由这个图中可以看出来,0-1这段时间的数据累积构成了RDD@time1,1-2这段时间的数据累积构成了RDD@time2,。。。也就是说,在 Spark Streaming中,DStream中的每个RDD的数据是一个时间窗口的累计。

 

 

下图展示了对DStream实施转换算子flatMap操作。需要指出的是,RDD的转换操作是由Spark Engine来实现的,原因是Spark Engine接受了原始的RDD以及作用于RDD上的算子,在计算结果时才真正的对RDD实施算子操作

 
Spark Streaming详解(重点窗口计算)
 

 

 

 

按照下面这幅图所呈现出来的含义是,Spark Streaming用于将输入的数据进行分解成一个一个的RDD,每个RDD交由Spark Engine进行处理以得到最后的处理数据?是的!

 

Spark Streaming详解(重点窗口计算)
上图中,Spark Streaming模块用于将接受到数据定时的切分成RDD(上图中定义为batch of input data),这些RDD交由Spark Engine进行计算。Spark Streaming模块负责数据接收并定时转换成一系列RDD,Spark Engine对Spark Streaming送过来的RDD进行计算

 

DStream层次关系

 

 

DStream的window操作

Java代码  
收藏代码

  1. /** 
  2.  * Return a new DStream in which each RDD contains all the elements in seen in a 
  3.  * sliding window of time over this DStream. 
  4.  * @param windowDuration width of the window; must be a multiple of this DStream’s 
  5.  *                       batching interval 
  6.  * @param slideDuration  sliding interval of the window (i.e., the interval after which 
  7.  *                       the new DStream will generate RDDs); must be a multiple of this 
  8.  *                       DStream’s batching interval 
  9.  */  
  10. def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {  
  11.   new WindowedDStream(this, windowDuration, slideDuration)  
  12. }  

DStream与window相关的两个参数是windowDuration和slideDuration,这两个参数究竟表示什么含义。通过window操作,DStream转换为了WindowedDStream

windowDuration表示的是对过去的一个windowDuration时间间隔的数据进行统计计算, windowDuration是intervalBatch的整数倍,也就是说,假如windowDuration=n*intervalBatch, 那么window操作就是对过去的n个RDD进行统计计算
如下内容来自于Spark Streaming的官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html

 

Spark Streaming也提供了窗口计算(window computations)的功能,允许我们每隔一段时间(sliding duration)对过去一个时间段内(window duration)的数据进行转换操作(tranformation).

slideDruation控制着窗口计算的频度,windowDuration控制着窗口计算的时间跨度。slideDruation和windowDuration都必须是batchInterval的整数倍。假想如下一种场景:

windowDuration=3*batchInterval,

slideDuration=10*batchInterval,

表示的含义是每个10个时间间隔对之前的3个RDD进行统计计算,也意味着有7个RDD没在window窗口的统计范围内。slideDuration的默认值是batchInterval

 

 

下图展示了滑动窗口的概念
 

 

 
Spark Streaming详解(重点窗口计算)
如上图所示,一个滑动窗口时间段((sliding window length)内的所有RDD会进行合并以创建windowed DStream所对应的RDDD。每个窗口操作有两个参数:

 

  • window length – The duration of the window (3 in the figure),滑动窗口的时间跨度,指本次window操作所包含的过去的时间间隔(图中包含3个batch interval,可以理解时间单位)
  • sliding interval – The interval at which the window operation is performed (2 in the figure).(窗口操作执行的频率,即每隔多少时间计算一次)

These two parameters must be multiples of the batch interval of the source DStream (1 in the figure). 这表示,sliding window length的时间长度以及sliding interval都要是batch interval的整数倍。
batch interval是在构造StreamingContext时传入的(1 in the figure)
 

说明:

window length为什么是3?如椭圆形框,它是从第三秒开始算起(包括第三秒),第五秒结束,即包含3,4,5三个1秒,因此是3

sliding interval为什么是2?主要是看圆角矩形框的右边线,虚线的圆角矩形框的右边线在time 3结束, 实线的圆角矩形框的右边线在time 5结束,所以跨度是2。也就是看时间的最右侧即可,以右边线为基准,每个窗口操作(window length)占用了3个时间片。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
表示每隔10秒钟对过去30秒钟产生的单词进行计数。这个方法有个不合理的地方,既然要求sliding window length和sliding interval都是batch interval的整数倍,那么此处为什么不用时间单位,而使用绝对的时间长度呢?

 

 

 

Spark Streaming Sources

这是Spark Streaming的数据输入源,包括两类:基本数据源和高级数据源

 

基本数据源

  • file systems
  • socket connections
  • Akka actors

以上数据源,StreamingContext的API直接提供,

  • fileStream,

监听HDFS文件系统的新文件的创建,读取其中内容。如果文件已存在而内容有变化,是不会被监听到的,因此只能将文件内容在某个位置写好后,然后移动到Spark Streaming监听的目录,如果文件在这个目录下内容发生变化,则Spark Streaming无法监听到

 

另外需要注意的是,Spark Streaming启动后,Spark Streaming通过文件的最后修改时间(modify time)来判断一个新加入到监听目录的文件是否有效。如果一个较长时间没有更新的文件move到监听目录,Spark Streaming也不会对它进行读取进而计算

 

Java代码  
收藏代码

  1. /** 
  2.  * Create a input stream that monitors a Hadoop-compatible filesystem 
  3.  * for new files and reads them using the given key-value types and input format. 
  4.  * Files must be written to the monitored directory by “moving” them from another 
  5.  * location within the same file system. File names starting with . are ignored. 
  6.  * @param directory HDFS directory to monitor for new file 
  7.  * @tparam K Key type for reading HDFS file 
  8.  * @tparam V Value type for reading HDFS file 
  9.  * @tparam F Input format for reading HDFS file 
  10.  */  
  11. def fileStream[  
  12.   K: ClassTag,  
  13.   V: ClassTag,  
  14.   F <: NewInputFormat[K, V]: ClassTag  
  15. ] (directory: String): InputDStream[(K, V)] = {  
  16.   new FileInputDStream[K, V, F](this, directory)  
  17. }  

 

  • socket connections

 

Java代码  
收藏代码

  1. /** 
  2.  * Create an input stream from TCP source hostname:port. Data is received using 
  3.  * a TCP socket and the receive bytes it interepreted as object using the given 
  4.  * converter. 
  5.  * @param hostname      Hostname to connect to for receiving data 
  6.  * @param port          Port to connect to for receiving data 
  7.  * @param converter     Function to convert the byte stream to objects 
  8.  * @param storageLevel  Storage level to use for storing the received objects 
  9.  * @tparam T            Type of the objects received (after converting bytes to objects) 
  10.  */  
  11. def socketStream[T: ClassTag](  
  12.     hostname: String,  
  13.     port: Int,  
  14.     converter: (InputStream) => Iterator[T],  
  15.     storageLevel: StorageLevel  
  16.   ): ReceiverInputDStream[T] = {  
  17.   new SocketInputDStream[T](this, hostname, port, converter, storageLevel)  
  18. }  

 问题: converter怎么使用?把InputStream转换为Iterator[T]集合

 

 高级数据源

 

Source Artifact

Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Kinesis spark-streaming-kinesis-asl_2.10 [Amazon Software License]
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10

 

Spark Streaming注意点:

 1. When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL where n > number of receivers to run (see Spark Properties for information on how to set the master).Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process them.

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/148736.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 【CEGUI】CEGUI入门篇之创建window(四)

    【CEGUI】CEGUI入门篇之创建window(四)以下内容翻译自http://static.cegui.org.uk/docs/0.8.7/window_tutorial.html这里介绍CEGUIwindow的创建及如何让window在屏幕上显示出来,在此之前,需要了解“CEGUI入门篇之初始化(一)”、“CEGUI入门篇之使用ResourceProvider加载资源(二)”和“CEGUI入门篇之数据文件及默认初始化(三)”。1、window和

    2022年7月23日
    10
  • SQL Server 2012 数据库备份还原「建议收藏」

    SQL Server 2012 数据库备份还原「建议收藏」文章目录1.数据库备份2.创建备份设备使用SSMS工具创建备份设备使用SQL方式创建备份设备3.完整备份与还原使用SSMS工具完整备份与还原使用SQL方式完整备份与还原4.差异备份与还原使用SSMS工具差异备份与还原使用SQL方式差异备份与还原5.事务日志备份与还原使用SSMS工具事务日志备份与还原使用SQL方式事务日志备份与还原1.数据库备份    数据库备份,即从SQLServer数据…

    2022年5月14日
    61
  • 将截断字符串或二进制数据解决办法_数据库从字符串转换日期失败

    将截断字符串或二进制数据解决办法_数据库从字符串转换日期失败在EF中,使用CodeFirst给实体添加约束的时候,使用NeGut控制台进行更新到数据库中,先使用add-migrationmigrationName命令进行创建(migrationName是进行更新的名字),然后使用Update-Database进行更新到数据库,此时报出问题:将截断字符串或二进制数据。语句已终止。错误原因为什么会报出这个问题,原因就是添加的限制和数据库中已…

    2022年10月7日
    0
  • Navicat Premium 15激活码【2021.10最新】

    (Navicat Premium 15激活码)最近有小伙伴私信我,问我这边有没有免费的intellijIdea的激活码,然后我将全栈君台教程分享给他了。激活成功之后他一直表示感谢,哈哈~https://javaforall.net/100143.htmlIntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,上面是详细链接哦~09LV…

    2022年3月28日
    59
  • 大数据应用及其解决方案(完整版)

    大数据应用及其解决方案(完整版)目录1、大数据概述1.1.概述1.2.大数据定义1.3.大数据技术发展2、大数据应用2.1.大数据应用阐述2.2.大数据应用架构2.3.大数据行业应用2.3.1.医疗行业2.3.2.能源行业2.3.3.通信行业2.3.4.零售业3、大数据解决方案3.1.大数据技术组成3.1.1.分析技术3.1.2.存储数据库…

    2022年6月2日
    49
  • Nginx修改默认端口80

    Nginx修改默认端口80前言    安装流程请参考我的文章–Windows下安装Nginx。    博客地址:https://blog.csdn.net/zengwende/article/details/86610692修改步骤1、打开Nginx的配置文件nginx.conf2、修改默认端口的值即可(nginx默认的端口为80) …

    2022年9月8日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号