Spark Streaming详解(重点窗口计算)

Spark Streaming详解(重点窗口计算)前面有几篇关于SparkStreaming的博客,那会只是作为Spark入门,快速体验Spark之用,只是照着葫芦画瓢。本文结合Spark官网上SparkStreaming的编程指南对SparkStreaming进行介绍StreamingContext如同SparkContext一样,StreamingContext也是SparkStreaming应用程序通往Spark集群的通道,它的定义…

大家好,又见面了,我是你们的朋友全栈君。

StreamingContext

如同SparkContext一样,StreamingContext也是Spark Streaming应用程序通往Spark集群的通道,它的定义如下:

Java代码  
收藏代码

  1. /** 
  2.  * Main entry point for Spark Streaming functionality. It provides methods used to create 
  3.  * [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either 
  4.  * created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf 
  5.  * configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext. 
  6.  * The associated SparkContext can be accessed using `context.sparkContext`. After 
  7.  * creating and transforming DStreams, the streaming computation can be started and stopped 
  8.  * using `context.start()` and `context.stop()`, respectively. 
  9.  * `context.awaitTermination()` allows the current thread to wait for the termination 
  10.  * of the context by `stop()` or by an exception. 
  11.  */  
  12. class StreamingContext private[streaming] (  
  13.     sc_ : SparkContext,  
  14.     cp_ : Checkpoint,  
  15.     batchDur_ : Duration  
  16.   ) extends Logging {  

 通过类的文档注释,我们看到:

1. 提供了从各种输入数据源创建DStream的方法

2,参数中的batchDur_是Duration类型的对象,比如Second(10),这个参数的含义是the time interval at which streaming data will be divided into batches,也就是说,假如batchDur_为Second(10)表示Spark Streaming会把每10秒钟的数据作为一个Batch,而一个Batch就是一个RDD?是的,一个RDD的数据对应一个batchInterval累加读取到的数据

 

DStream

Java代码  
收藏代码

  1. /** 
  2.  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous 
  3.  * sequence of RDDs (of the same type) representing a continuous stream of data (see 
  4.  * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). 
  5.  * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, 
  6.  * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by 
  7.  * transforming existing DStreams using operations such as `map`, 
  8.  * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream 
  9.  * periodically generates a RDD, either from live data or by transforming the RDD generated by a 
  10.  * parent DStream. 
  11.  * 
  12.  * This class contains the basic operations available on all DStreams, such as `map`, `filter` and 
  13.  * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains 
  14.  * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and 
  15.  * `join`. These operations are automatically available on any DStream of pairs 
  16.  * (e.g., DStream[(Int, Int)] through implicit conversions when 
  17.  * `org.apache.spark.streaming.StreamingContext._` is imported. 
  18.  * 
  19.  * DStreams internally is characterized by a few basic properties: 
  20.  *  – A list of other DStreams that the DStream depends on 
  21.  *  – A time interval at which the DStream generates an RDD 
  22.  *  – A function that is used to generate an RDD after each time interval 
  23.  */  

 从文档中,我们可以看到如下几点:

1. 对DStream实施map操作,会转换成另外一个DStream

2. DStream是一组连续的RDD序列,这些RDD中的元素的类型是一样的。DStream是一个时间上连续接收数据但是接受到的数据按照指定的时间(batchInterval)间隔切片,每个batchInterval都会构造一个RDD,因此,Spark Streaming实质上是根据batchInterval切分出来的RDD串,想象成糖葫芦,每个山楂就是一个batchInterval形成的RDD

3. 对DStream实施windows或者reduceByKeyAndWindow操作,也是转换成另外一个DStream(window操作是stateful DStream Transformation)

4. DStream同RDD一样,也定义了map,filter,window等操作,同时,对于元素类型为(K,V)的pair DStream,Spark Streaming提供了一个隐式转换的类,PairStreamFunctions

5. DStream内部有如下三个特性:

-DStream也有依赖关系,一个DStream可能依赖于其它的DStream(依赖关系的产生,同RDD是一样的)

-DStream创建RDD的时间间隔,这个时间间隔是不是就是构造StreamingContext传入的第三个参数?是的!

-在时间间隔到达后,DStream创建RDD的方法

Spark Streaming详解(重点窗口计算)

 在DStream内部,DStream表现为一系列的RDD的序列,针对DStream的操作(比如map,filter)会转换到它底层的RDD的操 作,由这个图中可以看出来,0-1这段时间的数据累积构成了RDD@time1,1-2这段时间的数据累积构成了RDD@time2,。。。也就是说,在 Spark Streaming中,DStream中的每个RDD的数据是一个时间窗口的累计。

 

 

下图展示了对DStream实施转换算子flatMap操作。需要指出的是,RDD的转换操作是由Spark Engine来实现的,原因是Spark Engine接受了原始的RDD以及作用于RDD上的算子,在计算结果时才真正的对RDD实施算子操作

 
Spark Streaming详解(重点窗口计算)
 

 

 

 

按照下面这幅图所呈现出来的含义是,Spark Streaming用于将输入的数据进行分解成一个一个的RDD,每个RDD交由Spark Engine进行处理以得到最后的处理数据?是的!

 

Spark Streaming详解(重点窗口计算)
上图中,Spark Streaming模块用于将接受到数据定时的切分成RDD(上图中定义为batch of input data),这些RDD交由Spark Engine进行计算。Spark Streaming模块负责数据接收并定时转换成一系列RDD,Spark Engine对Spark Streaming送过来的RDD进行计算

 

DStream层次关系

 

 

DStream的window操作

Java代码  
收藏代码

  1. /** 
  2.  * Return a new DStream in which each RDD contains all the elements in seen in a 
  3.  * sliding window of time over this DStream. 
  4.  * @param windowDuration width of the window; must be a multiple of this DStream’s 
  5.  *                       batching interval 
  6.  * @param slideDuration  sliding interval of the window (i.e., the interval after which 
  7.  *                       the new DStream will generate RDDs); must be a multiple of this 
  8.  *                       DStream’s batching interval 
  9.  */  
  10. def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {  
  11.   new WindowedDStream(this, windowDuration, slideDuration)  
  12. }  

DStream与window相关的两个参数是windowDuration和slideDuration,这两个参数究竟表示什么含义。通过window操作,DStream转换为了WindowedDStream

windowDuration表示的是对过去的一个windowDuration时间间隔的数据进行统计计算, windowDuration是intervalBatch的整数倍,也就是说,假如windowDuration=n*intervalBatch, 那么window操作就是对过去的n个RDD进行统计计算
如下内容来自于Spark Streaming的官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html

 

Spark Streaming也提供了窗口计算(window computations)的功能,允许我们每隔一段时间(sliding duration)对过去一个时间段内(window duration)的数据进行转换操作(tranformation).

slideDruation控制着窗口计算的频度,windowDuration控制着窗口计算的时间跨度。slideDruation和windowDuration都必须是batchInterval的整数倍。假想如下一种场景:

windowDuration=3*batchInterval,

slideDuration=10*batchInterval,

表示的含义是每个10个时间间隔对之前的3个RDD进行统计计算,也意味着有7个RDD没在window窗口的统计范围内。slideDuration的默认值是batchInterval

 

 

下图展示了滑动窗口的概念
 

 

 
Spark Streaming详解(重点窗口计算)
如上图所示,一个滑动窗口时间段((sliding window length)内的所有RDD会进行合并以创建windowed DStream所对应的RDDD。每个窗口操作有两个参数:

 

  • window length – The duration of the window (3 in the figure),滑动窗口的时间跨度,指本次window操作所包含的过去的时间间隔(图中包含3个batch interval,可以理解时间单位)
  • sliding interval – The interval at which the window operation is performed (2 in the figure).(窗口操作执行的频率,即每隔多少时间计算一次)

These two parameters must be multiples of the batch interval of the source DStream (1 in the figure). 这表示,sliding window length的时间长度以及sliding interval都要是batch interval的整数倍。
batch interval是在构造StreamingContext时传入的(1 in the figure)
 

说明:

window length为什么是3?如椭圆形框,它是从第三秒开始算起(包括第三秒),第五秒结束,即包含3,4,5三个1秒,因此是3

sliding interval为什么是2?主要是看圆角矩形框的右边线,虚线的圆角矩形框的右边线在time 3结束, 实线的圆角矩形框的右边线在time 5结束,所以跨度是2。也就是看时间的最右侧即可,以右边线为基准,每个窗口操作(window length)占用了3个时间片。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
表示每隔10秒钟对过去30秒钟产生的单词进行计数。这个方法有个不合理的地方,既然要求sliding window length和sliding interval都是batch interval的整数倍,那么此处为什么不用时间单位,而使用绝对的时间长度呢?

 

 

 

Spark Streaming Sources

这是Spark Streaming的数据输入源,包括两类:基本数据源和高级数据源

 

基本数据源

  • file systems
  • socket connections
  • Akka actors

以上数据源,StreamingContext的API直接提供,

  • fileStream,

监听HDFS文件系统的新文件的创建,读取其中内容。如果文件已存在而内容有变化,是不会被监听到的,因此只能将文件内容在某个位置写好后,然后移动到Spark Streaming监听的目录,如果文件在这个目录下内容发生变化,则Spark Streaming无法监听到

 

另外需要注意的是,Spark Streaming启动后,Spark Streaming通过文件的最后修改时间(modify time)来判断一个新加入到监听目录的文件是否有效。如果一个较长时间没有更新的文件move到监听目录,Spark Streaming也不会对它进行读取进而计算

 

Java代码  
收藏代码

  1. /** 
  2.  * Create a input stream that monitors a Hadoop-compatible filesystem 
  3.  * for new files and reads them using the given key-value types and input format. 
  4.  * Files must be written to the monitored directory by “moving” them from another 
  5.  * location within the same file system. File names starting with . are ignored. 
  6.  * @param directory HDFS directory to monitor for new file 
  7.  * @tparam K Key type for reading HDFS file 
  8.  * @tparam V Value type for reading HDFS file 
  9.  * @tparam F Input format for reading HDFS file 
  10.  */  
  11. def fileStream[  
  12.   K: ClassTag,  
  13.   V: ClassTag,  
  14.   F <: NewInputFormat[K, V]: ClassTag  
  15. ] (directory: String): InputDStream[(K, V)] = {  
  16.   new FileInputDStream[K, V, F](this, directory)  
  17. }  

 

  • socket connections

 

Java代码  
收藏代码

  1. /** 
  2.  * Create an input stream from TCP source hostname:port. Data is received using 
  3.  * a TCP socket and the receive bytes it interepreted as object using the given 
  4.  * converter. 
  5.  * @param hostname      Hostname to connect to for receiving data 
  6.  * @param port          Port to connect to for receiving data 
  7.  * @param converter     Function to convert the byte stream to objects 
  8.  * @param storageLevel  Storage level to use for storing the received objects 
  9.  * @tparam T            Type of the objects received (after converting bytes to objects) 
  10.  */  
  11. def socketStream[T: ClassTag](  
  12.     hostname: String,  
  13.     port: Int,  
  14.     converter: (InputStream) => Iterator[T],  
  15.     storageLevel: StorageLevel  
  16.   ): ReceiverInputDStream[T] = {  
  17.   new SocketInputDStream[T](this, hostname, port, converter, storageLevel)  
  18. }  

 问题: converter怎么使用?把InputStream转换为Iterator[T]集合

 

 高级数据源

 

Source Artifact

Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Kinesis spark-streaming-kinesis-asl_2.10 [Amazon Software License]
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10

 

Spark Streaming注意点:

 1. When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL where n > number of receivers to run (see Spark Properties for information on how to set the master).Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process them.

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/148736.html原文链接:https://javaforall.net

(0)
上一篇 2022年6月26日 下午11:00
下一篇 2022年6月26日 下午11:00


相关推荐

  • python控制谷歌浏览器_python安装插件

    python控制谷歌浏览器_python安装插件在进行UI自动化之前,需要安装各个浏览器,以及调用浏览器时用到的驱动。以下以安装谷歌为例。

    2025年6月9日
    3
  • 指针函数与函数指针的用法与区别

    指针函数与函数指针的用法与区别一 区别 1 指针函数 指的就是返回值是指针的函数 本质就是个函数 2 函数指针 指的是指向函数的指针变量 本质就是个指针 二 用法 1 指针函数 int func inta intb 普通函数就是 intfunc inta intb 返回值是 int 而指针函数就是返回值是指针的函数 即返回值是 int 2 函数指针 int func intx inty 这里

    2025年6月19日
    3
  • 三大通信协议(二):IIC通信协议

    三大通信协议(二):IIC通信协议1.概念是什么?I²C(Inter-IntegratedCircuit),中文应该叫集成电路总线,它是一种串行通信总线,使用多主从架构,是由飞利浦公司在1980年代初设计的,方便了主板、嵌入式系统或手机与周边设备组件之间的通讯。由于其简单性,它被广泛用于微控制器与传感器阵列,显示器,IoT设备,EEPROM等之间的通信。优点仅需要两条总线即可通讯(大大的节约了IO口资源)最大主机数量:无限制。最大从机限制:理论127(一个主机多个从机,一对多,多对一,多对多)2.硬件连

    2022年5月5日
    123
  • C语言数组初始化

    C语言数组初始化转载博客代码编译运行环境:VS2017+Win32+Debug1.字符数组的初始化方式C语言中表示字符串有两种方式,数组和指针,字符数组是我们经常使用的方式。变量的定义包括指明变量所属类型、变量名称、分配空间以及初始化。可以看出,变量的初始化是变量定义的一部分。除了const变量需要显示初始化以外,其它变量如果在定义时未显示初始化,编译器会为变量以默认…

    2022年7月18日
    21
  • 旁路由Openwrt设置

    旁路由Openwrt设置旁路由Openwrt设置完成!

    2022年6月10日
    33
  • handlersocket php,handlersocket安装配置

    handlersocket php,handlersocket安装配置一、安装handlersocket下载地址:https://nodeload.github.com/ahiguti/HandlerSocket-Plugin-for-MySQL/tarball/mastertarxfahiguti-HandlerSocket-Plugin-for-MySQL-1.0.6-88-gefd9972.tar.gzcdahiguti-HandlerSocket-…

    2022年8月24日
    10

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号