Spark Streaming Join

Spark Streaming Join多数据源Join思路多数据源Join大致有以下三种思路:数据源端Join,如Android/IOS客户端在上报用户行为数据时就获取并带上用户基础信息。计算引擎上Join,如用SparkStreaming、Flink做Join。结果端Join,如用HBase/ES做Join,Join键做Rowkey/_id,各字段分别写入列簇、列或field。三种思路各有优劣,使用时注意…

大家好,又见面了,我是你们的朋友全栈君。

多数据源Join思路

多数据源Join大致有以下三种思路:

  • 数据源端Join,如Android/IOS客户端在上报用户行为数据时就获取并带上用户基础信息。

  • 计算引擎上Join,如用Spark Streaming、Flink做Join。

  • 结果端Join,如用HBase/ES做Join,Join键做Rowkey/_id,各字段分别写入列簇、列或field。

三种思路各有优劣,使用时注意一下。这里总结在计算引擎Spark Streaming上做Join。

Stream-Static Join

流与完全静态数据Join

流与完全静态数据Join。有两种方式,一种是RDD Join方式,另一种是Broadcast Join(也叫Map-Side Join)方式。

RDD Join 方式

思路:RDD Join RDD 。

package com.bigData.spark

import com.alibaba.fastjson.{ 
   JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{ 
   Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ 
   ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{ 
   Durations, StreamingContext}

/** * Author: Wang Pei * License: Copyright(c) Pei.Wang * Summary: * * Stream-Static Join * * spark 2.2.2 * */
case class UserInfo(userID:String,userName:String,userAddress:String)
object StreamStaicJoin { 
   
  def main(args: Array[String]): Unit = { 
   

    //设置日志等级
    Logger.getLogger("org").setLevel(Level.WARN)

    //Kafka 参数
    val kafkaParams= Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean),
      "group.id" -> "testTopic3_consumer_v1")

    //spark环境
    val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")
    val ssc = new StreamingContext(sparkConf,Durations.seconds(10))

    /** 1) 静态数据: 用户基础信息*/
    val userInfo=ssc.sparkContext.parallelize(Array(
      UserInfo("user_1","name_1","address_1"),
      UserInfo("user_2","name_2","address_2"),
      UserInfo("user_3","name_3","address_3"),
      UserInfo("user_4","name_4","address_4"),
      UserInfo("user_5","name_5","address_5")
    )).map(item=>(item.userID,item))


    /** 2) 流式数据: 用户发的tweet数据*/
    /** 数据示例: * eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID * {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */

    val kafkaDStream=KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](Set("testTopic3"),kafkaParams)
    ).map(item=>parseJson(item.value())).map(item=>{ 
   
      val userID = item.getString("userID")
      val eventTime = item.getString("eventTime")
      val language= item.getString("language")
      val favoriteCount = item.getInteger("favoriteCount")
      val retweetCount = item.getInteger("retweetCount")
      (userID,(userID,eventTime,language,favoriteCount,retweetCount))
    })


    /** 3) 流与静态数据做Join (RDD Join 方式)*/
    kafkaDStream.foreachRDD(_.join(userInfo).foreach(println))

    ssc.start()
    ssc.awaitTermination()

  }

  /**json解析*/
  def parseJson(log:String):JSONObject={ 
   
    var ret:JSONObject=null
    try{ 
   
      ret=JSON.parseObject(log)
    }catch { 
   
      //异常json数据处理
      case e:JSONException => println(log)
    }
    ret
  }

}

stream_static_rdd_join.png

Broadcast Join 方式

思路:RDD遍历每一条数据,去匹配广播变量中的值。

package com.bigData.spark

import com.alibaba.fastjson.{ 
   JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{ 
   Level, Logger}
import org.apache.spark.{ 
   SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.{ 
   ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{ 
   Durations, StreamingContext}

/** * Author: Wang Pei * License: Copyright(c) Pei.Wang * Summary: * * Stream-Static Join * * spark 2.2.2 * */
case class UserInfo(userID:String,userName:String,userAddress:String)
object StreamStaticJoin2 { 
   
  def main(args: Array[String]): Unit = { 
   

    //设置日志等级
    Logger.getLogger("org").setLevel(Level.WARN)

    //Kafka 参数
    val kafkaParams= Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean),
      "group.id" -> "testTopic3_consumer_v1")

    //spark环境
    val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")
    val ssc = new StreamingContext(sparkConf,Durations.seconds(10))

    /** 1) 静态数据: 用户基础信息。 将用户基础信息广播出去。*/
    val broadcastUserInfo=ssc.sparkContext.broadcast(
      Map(
        "user_1"->UserInfo("user_1","name_1","address_1"),
        "user_2"->UserInfo("user_2","name_2","address_2"),
        "user_3"->UserInfo("user_3","name_3","address_3"),
        "user_4"->UserInfo("user_4","name_4","address_4"),
        "user_5"->UserInfo("user_5","name_5","address_5")
      ))


    /** 2) 流式数据: 用户发的tweet数据*/
    /** 数据示例: * eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID * {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */
    val kafkaDStream=KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](List("testTopic3"),kafkaParams)
    ).map(item=>parseJson(item.value())).map(item=>{ 
   
      val userID = item.getString("userID")
      val eventTime = item.getString("eventTime")
      val language= item.getString("language")
      val favoriteCount = item.getInteger("favoriteCount")
      val retweetCount = item.getInteger("retweetCount")
      (userID,(userID,eventTime,language,favoriteCount,retweetCount))
    })


    /** 3) 流与静态数据做Join (Broadcast Join 方式)*/
    val result=kafkaDStream.mapPartitions(part=>{ 
   
      val userInfo = broadcastUserInfo.value
      part.map(item=>{ 
   
        (item._1,(item._2,userInfo.getOrElse(item._1,null)))})
    })

    result.foreachRDD(_.foreach(println))


    ssc.start()
    ssc.awaitTermination()

  }

  /**json解析*/
  def parseJson(log:String):JSONObject={ 
   
    var ret:JSONObject=null
    try{ 
   
      ret=JSON.parseObject(log)
    }catch { 
   
      //异常json数据处理
      case e:JSONException => println(log)
    }
    ret
  }

}

stream_static_rdd_join2.png

流与半静态数据Join

半静态数据指的是放在Redis等的数据,会被更新。

思路:RDD 每个Partition连接一次Redis,遍历Partition中每条数据,根据k,去Redis中查找v。

package com.bigData.spark

import com.alibaba.fastjson.{ 
   JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{ 
   Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ 
   ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{ 
   Durations, StreamingContext}
import redis.clients.jedis.Jedis

/** * Author: Wang Pei * License: Copyright(c) Pei.Wang * Summary: * * Stream-Static Join * * spark 2.2.2 * */
object StreamStaicJoin3 { 
   
  def main(args: Array[String]): Unit = { 
   

    //设置日志等级
    Logger.getLogger("org").setLevel(Level.WARN)

    //Kafka 参数
    val kafkaParams= Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean),
      "group.id" -> "testTopic3_consumer_v1")

    //spark环境
    val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")
    val ssc = new StreamingContext(sparkConf,Durations.seconds(10))

    /** 1) 半静态数据: 用户基础信息,在Redis中*/
    /** HMSET user_1 userID "user_1" name "name_1" address "address_1" */
    /** HMSET user_2 userID "user_2" name "name_2" address "address_2" */


    /** 2) 流式数据: 用户发的tweet数据*/
    /** 数据示例: * eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID * {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */

    val kafkaDStream=KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](Set("testTopic3"),kafkaParams)
    ).map(item=>parseJson(item.value())).map(item=>{ 
   
      val userID = item.getString("userID")
      val eventTime = item.getString("eventTime")
      val language= item.getString("language")
      val favoriteCount = item.getInteger("favoriteCount")
      val retweetCount = item.getInteger("retweetCount")
      (userID,(userID,eventTime,language,favoriteCount,retweetCount))
    })

    /** 3) 流与半静态数据做Join (RDD Join 方式)*/
    val result=kafkaDStream.mapPartitions(part=>{ 
   
      val redisCli=connToRedis("localhost",6379,3000,10)
      part.map(item=>{ 
   
        (item._1,(item._2,redisCli.hmget(item._1,"userID","name","address")))
      })
    })

    result.foreachRDD(_.foreach(println))


    ssc.start()
    ssc.awaitTermination()

  }

  /**json解析*/
  def parseJson(log:String):JSONObject={ 
   
    var ret:JSONObject=null
    try{ 
   
      ret=JSON.parseObject(log)
    }catch { 
   
      //异常json数据处理
      case e:JSONException => println(log)
    }
    ret
  }

  /**连接到redis*/
  def connToRedis(redisHost:String,redisPort:Int,timeout:Int,dbNum:Int): Jedis ={ 
   
    val redisCli=new Jedis(redisHost,redisPort,timeout)
    redisCli.connect()
    redisCli.select(dbNum)
    redisCli
  }

}

stream_static_join3.png

Stream-Stream Join

流与流Join。

思路:DStream Join DStream。

package com.bigData.spark

import com.alibaba.fastjson.{ 
   JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{ 
   Level, Logger}
import org.apache.spark.{ 
   SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.{ 
   ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{ 
   Durations, StreamingContext}

/** * Author: Wang Pei * License: Copyright(c) Pei.Wang * Summary: * * Stream-Stream Join * * spark 2.2.2 * */
object StreamStreamJoin { 
   
  def main(args: Array[String]): Unit = { 
   

    //设置日志等级
    Logger.getLogger("org").setLevel(Level.WARN)

    //Kafka 参数
    val kafkaParams1= Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean),
      "group.id" -> "testTopic3_consumer_v1")

    val kafkaParams2= Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean),
      "group.id" -> "testTopic4_consumer_v1")


    //spark环境
    val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")
    val ssc = new StreamingContext(sparkConf,Durations.seconds(10))

    /** 1) 流式数据: 用户发的tweet数据*/
    /** 数据示例: * eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID * {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */

    val kafkaDStream1=KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](List("testTopic3"),kafkaParams1)
    ).map(item=>parseJson(item.value())).map(item=>{ 
   
      val userID = item.getString("userID")
      val eventTime = item.getString("eventTime")
      val language= item.getString("language")
      val favoriteCount = item.getInteger("favoriteCount")
      val retweetCount = item.getInteger("retweetCount")
      (userID,(userID,eventTime,language,favoriteCount,retweetCount))
    })

    /** 2) 流式数据: 用户发的tweet数据*/
    /** 数据示例: * eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID * {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */

    val kafkaDStream2=KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](List("testTopic4"),kafkaParams2)
    ).map(item=>parseJson(item.value())).map(item=>{ 
   
      val userID = item.getString("userID")
      val eventTime = item.getString("eventTime")
      val language= item.getString("language")
      val favoriteCount = item.getInteger("favoriteCount")
      val retweetCount = item.getInteger("retweetCount")
      (userID,(userID,eventTime,language,favoriteCount,retweetCount))
    })

    /** 3) Stream-Stream Join*/
    val joinedDStream = kafkaDStream1.leftOuterJoin(kafkaDStream2)

    joinedDStream.foreachRDD(_.foreach(println))

    ssc.start()
    ssc.awaitTermination()

  }

  /**json解析*/
  def parseJson(log:String):JSONObject={ 
   
    var ret:JSONObject=null
    try{ 
   
      ret=JSON.parseObject(log)
    }catch { 
   
      //异常json数据处理
      case e:JSONException => println(log)
    }
    ret
  }

}

stream_stream_join.png

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/147399.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 先验概率和后验概率的定义是什么_先验和后验什么意思

    先验概率和后验概率的定义是什么_先验和后验什么意思话不多说,我因为在学习朴素贝叶斯的时候有点分不清楚先验概率、后验概率,所以就网上找了一些资料,大家各有各的理解,但感觉还是不太能从定义上区分,所以就有了下面这张图:图里面说的还是比较清晰的,大家有不理解的地方可以沟通交流嘛。…

    2022年10月18日
    5
  • b站超过1000万粉丝的up主(b站第一位千万up主)

    前几天一位好朋友入了B站,问我如何才能成为一名百万粉丝的up主。这不,于是我做了这篇的一些分析,知道了成为百万粉丝up主的一些小秘密。还做了一个昵称生成器,给其昵称起名提供建议。这是她的b站视频截图:关于昵称起名我的想法是这样,是我们把B站这些百万粉丝大佬的昵称分析一下成分构成,根据相关性随机起个名,是不是就有百万粉丝up主昵称的那味了?上面截图是她改名前的昵称,是否会改名,改名后叫什么咱们拭目以待。咱们现在就开始爬取整整:B站up主信息爬取直接通过b站首页去爬是很不方便的,这里我找到了两个第

    2022年4月18日
    294
  • 什么是差分数组?「建议收藏」

    什么是差分数组?「建议收藏」问题背景如果给你一个包含5000万个元素的数组,然后会有频繁区间修改操作,那什么是频繁的区间修改操作呢?比如让第1个数到第1000万个数每个数都加上1,而且这种操作时频繁的。此时你应该怎么做?很容易想到的是,从第1个数开始遍历,一直遍历到第1000万个数,然后每个数都加上1,如果这种操作很频繁的话,那这种暴力的方法在一些实时的系统中可能就拉跨了。因此,今天的主角就出现了——差分数组。…

    2022年4月28日
    44
  • vs生成动态库及使用动态库

    vs生成动态库及使用动态库动态库(.dll):动态库又称动态链接库英文为DLL,是DynamicLinkLibrary的缩写形式,DLL是一个包含可由多个程序同时使用的代码和数据的库,DLL不是可执行文件。动态链接提供了一种方法,使进程可以调用不属于其可执行代码的函数。函数的可执行代码位于一个DLL中,该DLL包含一个或多个已被编译、链接并与使用它们的进程分开存储的函数。DLL还有助于共享数据和资源。多个应用

    2022年9月25日
    4
  • js 删除换行符

    js 删除换行符mymsg=mymsg.replace(/<\/?.+?>/g,””);//html2txt去掉html标记mymsg=mymsg.replace(/\n|\r/g,””);//去掉换行转载于:https://www.cnblogs.com/jerryLee/archive/2010/02/01/1661036.html…

    2022年5月20日
    38
  • select into from 和 insert into select 的用法和区别

    select into from 和 insert into select 的用法和区别selectintofrom和insertintoselect都是用来复制表,两者的主要区别为:selectintofrom要求目标表不存在,因为在插入时会自动创建。insertintoselectfrom要求目标表存在 下面分别介绍两者语法 一、INSERTINTOSELECT语句 1、语句形式为:InsertintoTable2(field1…

    2022年7月15日
    23

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号