SparkStreaming之foreachRDD

SparkStreaming之foreachRDD首先我们来对官网的描述了解一下。DStream中的foreachRDD是一个非常强大函数,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。所以要掌握它,对它要有深入了解。下面有一些常用的错误需要理解。经常写数据到外部系统需要创建一个连接的object(eg:根据TCP协议连接到远程的服务器,我们连接外部数据库需要自己的句柄

大家好,又见面了,我是你们的朋友全栈君。

首先我们来对官网的描述了解一下。

DStream中的foreachRDD是一个非常强大函数,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。所以要掌握它,对它要有深入了解。下面有一些常用的错误需要理解。经常写数据到外部系统需要创建一个连接的object(eg:根据TCP协议连接到远程的服务器,我们连接外部数据库需要自己的句柄)和发送数据到远程的系统为此,开发者需要在Spark的driver创建一个object用于连接。

为了达到这个目的,开发人员可能不经意的在Spark驱动中创建一个连接对象,但是在Spark worker中 尝试调用这个连接对象保存记录到RDD中,如下

dstream.foreachRDD { 
    rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { 
    record =>
    connection.send(record) // executed at the worker
  }
}

这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能

传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该 在worker中初始化)等

等。正确的解决办法是在worker中创建连接对象。

然而,这会造成另外一个常见的错误-为每一个记录创建了一个连接对象。例如:

dstream.foreachRDD { 
    rdd =>
  rdd.foreach { 
    record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显

的减少系统的整体吞吐量。一个更好的解决办法是利用rdd.foreachPartition方法。 为RDD的partition创建一个连接对

象,用这个两件对象发送partition中的所有记录。

dstream.foreachRDD { 
    rdd =>
  rdd.foreachPartition { 
    partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

最后,可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象

池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支

dstream.foreachRDD { 
    rdd =>
  rdd.foreachPartition { 
    partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的

方式发生数据到外部系统。

其它需要注意的地方:

(1)输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD 

actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者 用于输出操作

dstream.foreachRDD(),但是没有任何RDD action操作在dstream.foreachRDD()里面,那么什么也不会执行。系统

仅仅会接收输入,然后丢弃它们。

(2)默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。


实验1:把SparkStreaming的内部数据存入Mysql

(1)在mysql中创建一个表用于存放数据

mysql> create database sparkStreaming;
Query OK, 1 row affected (0.01 sec)
 
mysql> use sparkStreaming;
Database changed
mysql> show tables;
Empty set (0.01 sec)
 
mysql> create table searchKeyWord(insert_time date,keyword varchar(30),search_count integer);
Query OK, 0 rows affected (0.05 sec)

(2)用scala编写连接Mysql的连接池

import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet

import org.apache.commons.dbcp.BasicDataSource
import org.apache.log4j.Logger

object scalaConnectPool {
  val  log = Logger.getLogger(scalaConnectPool.this.getClass)
  var ds:BasicDataSource = null
  def getDataSource={
    if(ds == null){
      ds = new BasicDataSource()
      ds.setUsername("root")
      ds.setPassword("iamhaoren")
      ds.setUrl("jdbc:mysql://localhost:3306/sparkStreaming")
      ds.setDriverClassName("com.mysql.jdbc.Driver")
      ds.setInitialSize(20)
      ds.setMaxActive(100)
      ds.setMinIdle(50)
      ds.setMaxIdle(100)
      ds.setMaxWait(1000)
      ds.setMinEvictableIdleTimeMillis(5*60*1000)
      ds.setTimeBetweenEvictionRunsMillis(10*60*1000)
      ds.setTestOnBorrow(true)
    }
    ds
  }

  def getConnection : Connection= {
    var connect:Connection = null
    try {
      if(ds != null){
        connect = ds.getConnection
      }else{
        connect = getDataSource.getConnection
      }
    }
    connect
  }

  def shutDownDataSource: Unit=if (ds !=null){ds.close()}

  def closeConnection(rs:ResultSet,ps:PreparedStatement,connect:Connection): Unit ={
    if(rs != null){rs.close}
    if(ps != null){ps.close}
    if(connect != null){connect.close}
  }

}

(3)编写SparkStreaming程序

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object dataToMySQL {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("use the foreachRDD write data to mysql").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(10))

    val streamData = ssc.socketTextStream("master",9999)
    val wordCount = streamData.map(line =>(line.split(",")(0),1)).reduceByKeyAndWindow(_+_,Seconds(60))
    val hottestWord = wordCount.transform(itemRDD => {
      val top3 = itemRDD.map(pair => (pair._2, pair._1))
        .sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
      ssc.sparkContext.makeRDD(top3)
    })


    hottestWord.foreachRDD( rdd =>{
      rdd.foreachPartition(partitionOfRecords =>{
        val connect = scalaConnectPool.getConnection
        connect.setAutoCommit(false)
        val stmt = connect.createStatement()
        partitionOfRecords.foreach(record =>{
          stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"+record._1+"','"+record._2+"')")
        })
        stmt.executeBatch()
        connect.commit()
      }
      )
    }
    )



    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

(4)编写一个socket端的数据模拟器

import java.io.{PrintWriter}  
import java.net.ServerSocket  
import scala.io.Source  
  
  
object streamingSimulation {  
  def index(n: Int) = scala.util.Random.nextInt(n)  
  
  def main(args: Array[String]) {  
    // 调用该模拟器需要三个参数,分为为文件路径、端口号和间隔时间(单位:毫秒)  
    if (args.length != 3) {  
      System.err.println("Usage: <filename> <port> <millisecond>")  
      System.exit(1)  
    }  
  
    // 获取指定文件总的行数  
    val filename = args(0)  
    val lines = Source.fromFile(filename).getLines.toList  
    val filerow = lines.length  
  
    // 指定监听某端口,当外部程序请求时建立连接  
    val listener = new ServerSocket(args(1).toInt)  
  
    while (true) {  
      val socket = listener.accept()  
      new Thread() {  
        override def run = {  
          println("Got client connected from: " + socket.getInetAddress)  
          val out = new PrintWriter(socket.getOutputStream(), true)  
          while (true) {  
            Thread.sleep(args(2).toLong)  
            // 当该端口接受请求时,随机获取某行数据发送给对方  
            val content = lines(index(filerow))  
            println("-------------------------------------------")  
            println(s"Time: ${System.currentTimeMillis()}")  
            println("-------------------------------------------")  
            println(content)  
            out.write(content + '\n')  
            out.flush()  
          }  
          socket.close()  
        }  
      }.start()  
    }  
  }  
  
}  

实验数据为:

spark
Streaming
better
than
storm
you
need
it
yes
do
it

(5)实验启动

在客户端启动数据流模拟

对socket端的数据模拟器程序进行 jar文件的打包,并放入到集群目录中

启动程序如下:

java -cp DataSimulation.jar streamingSimulation /root/application/upload/Information 9999 1000

启动SparkStreaming程序

结果如下:
SparkStreaming之foreachRDD

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/148544.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • SQL Server 2008安装教程

    SQL Server 2008安装教程1,打开后点击左边项中的“安装”。2,点击右边第一项。3,点击“确定”。4,输入产品密匙PTTFM-X467G-P7RH2-3Q6CG-4DMYB后点击“下一步”。5,安装好后在下一界面点击“全选”并设置共享目录。6,接着是“实例配置”界面,在此界面选择“默认实例”,并设置实例根目录。7,到了“服务器配置”页面,我们需要根据自己所需进行相关设置。点击对所有服务使用

    2022年6月23日
    23
  • 关于使用preparestatement来实现模糊查询

    关于使用preparestatement来实现模糊查询关于使用prparestatement来实现模糊查询

    2022年6月4日
    35
  • 自定义progressbar样式_样式

    自定义progressbar样式_样式android ProgressBar 样式讲解

    2022年4月21日
    49
  • socket原理讲解_电感器的作用及原理

    socket原理讲解_电感器的作用及原理1.网络中进程之间如何通信进程通信的概念最初来源于单机系统。由于每个进程都在自己的地址范围内运行,为保证两个相互通信的进程之间既互不干扰又协调一致工作,操作系统为进程通信提供了相应设施,如UNIXBSD有:管道(pipe)、命名管道(namedpipe)软中断信号(signal)UNIXsystemV有:消息(message)、共享存储区(sharedmemory)和信号量(semaphore)等.他们都仅限于用在本机进程之间通信。网间进程通信要解决的是不同主机进程间的相互

    2022年10月10日
    0
  • vue 双向绑定原理及依赖搜集的过程「建议收藏」

    vue 双向绑定原理及依赖搜集的过程「建议收藏」双向数据绑定机制:官方:vue是采用数据劫持结合发布者-订阅者模式的方式,通过Object.defineProperty()来劫持各个属性的setter,getter,在数据变动时发布消息给订阅者,触发响应的监听回调。第一步:需要observer的数据对象进行递归遍历,包括子属性对象的属性,都加上setter和getter,这样的话,给这个对象的某个值赋值,就会触发setter,那么就能监听到了数据变化第二步:compile解析模板令,将模板中的变量替换成数据.然后初始化渲染页面视图,并将每个令对

    2022年10月17日
    0
  • 打印纸张尺寸换算_纸张尺寸对照表

    打印纸张尺寸换算_纸张尺寸对照表常用纸张的尺寸大小对照表-纸张规格对照表纸张的大小国际标准化组织的ISO216国际标准指明了大多数国家使用的标准纸张的尺寸。此标准源自德国,在1922年通过,定义了A、B、C三组纸张尺寸,C组纸张尺寸主要用于信封。另外,有些国家也有自己的标准,如美国,日本。这里主要是指办公用纸。下面是一些标准纸张的具体尺寸。单位:mmISO216AA0A1A2A3A4A5A…

    2022年6月20日
    237

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号