Spark Streaming 实现 word count

Spark Streaming 实现 word countSparkStreami 实现 wordcount 一 一个输入源端口对应一个 receiver1 1 数据源端口 1 2sparkstream 接收处理数据二 两个输入源端口对应一个 receiver2 1 测试源端口一 一个输入源端口对应一个 receiver1 1 数据源端口使用网络猫作为数据的输入源端口下载网络猫 linux 命令行执行 yuminstall ync 使用测试端口 9999nc lk99991 2sparkstream 接收处理数据这

一、一个receiver接收一个输入源端口

1.1 数据源端口

yum install -y nc 

使用测试端口: 9999

nc -lk 9999 

1.2 spark streaming 接收处理数据

① spark shell 本地模式运行

  • 启动spark shell命令行
spark-shell 

在这里插入图片描述

  1. 导包
scala> import org.apache.spark._ import org.apache.spark._ scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._ 
  1. 创建 spark streaming context对象
    注: 需先停止默认的 Spark context 对象 sc,因为一个JVM线程中只能创建一个 SparkContext对象,默认创建的 Spark context 对象没有定义窗口信息、线程等其他配置项,而需要自定义一些配置项时,就必须要新创建一个 Spark context 对象,设置配置信息,所以要先停止 sc
    local [ n ] 中的 n 要大于receiver 的数量

scala> sc.stop scala> val conf = new SparkConf().setMaster("local[2]").setAppName("workWC1") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@5d08a65c // 设置窗口期时间为3s scala> val ssc = new StreamingContext(conf,Seconds(3)) ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@44a0e68f 
  1. 使用 socketTextStream 连接上端口 9999
scala> val lines = ssc.socketTextStream("localhost",9999) lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@ 
  1. 使用 flatMap(_.split()) 切分展开数据源输入的每条数据
scala> val words = lines.flatMap(_.split(" ")) words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@22f756c5 
  1. 将切分每条数据的每个元素组成元组形式的 (key,1)
scala> val pairs = words.map(word => (word,1)) pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@6a16ee0d 
  1. 使用 reduceByKey(+) 对每个key聚合,统计key的数量
scala> val wordCounts = pairs.reduceByKey(_+_) wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@7b031f53 
  1. 打印输出结果
scala> wordCounts.print() 
  1. 启动 spark streaming
ssc.start() 
  1. 在测试端口 9999 输入测试数据:
hello word hello tom hello jerry hello word 

在这里插入图片描述

  1. 输入 ctrl + D 结束任务

② java高级API运行

  1. 配置pom文件:pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>SparkStreaming-pr</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.7.1</hadoop.version> <scala.compat.version>2.11</scala.compat.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <!--<testSourceDirectory>src/test/scala</testSourceDirectory>--> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter><artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> 
  1. 创建main目录下创建 resources 文件夹,拷贝一份 log4j.properties 文件
    在这里插入图片描述
    设置为资源目录:
    在这里插入图片描述


log4j.properties

# contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Set everything to be logged to the console log4j.rootCategory= WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n 
  1. 创建测试类:SparkStreamingWordCount.scala
package pr import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{ 
   DStream, ReceiverInputDStream} import org.apache.spark.streaming.{ 
   Seconds, StreamingContext} object SparkStreamingWordCount extends App{ 
    //todo 创建一个spark StreamingContext对象 val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("demo1") val ssc = new StreamingContext(conf, Seconds(5)) //todo 使用spark streaming进行word count val inputDstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop-single", 9999) //todo 对输入流进行操作 val wordDstream: DStream[String] = inputDstream.flatMap(_.split(" ")) val wordAndOneDstream: DStream[(String, Int)] = wordDstream.map((_, 1)) val wordcounts: DStream[(String, Int)] = wordAndOneDstream.reduceByKey(_ + _) wordcounts.print() //todo 通过start() 启动消息采集和处理 ssc.start() //todo 等待程序终止 ssc.awaitTermination() } 
  1. 运行程序
  2. 在测试端口 9999 输入测试数据:
hello word hello tom hello jerry hello word 
  1. IDEA控制台打印结果如下:
    在这里插入图片描述

二、两个receiver接收两个输入源端口

2.1 测试源端口

  1. 测试端口1: 9999
nc -lk 9999 
  1. 测试端口2: 3333
nc -lk 3333 

2.2 spark streaming 接收处理数据

spark shell 命令行:

scala> sc.stop scala> import org.apache.spark._ import org.apache.spark._ scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._ scala> val conf = new SparkConf().setMaster("local[3]").setAppName("workWC2") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7d95a717 scala> val ssc = new StreamingContext(conf,Seconds(3)) ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@58d4fe33 // 使用 union 联合两个receiver scala> val lines = ssc.socketTextStream("localhost",9999).union(ssc.socketTextStream("localhost",3333)) lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.UnionDStream@5263f554 scala> val words = lines.flatMap(_.split(" ")) words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@ scala> val pairs = words.map(word => (word,1)) pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@386ec37d scala> val wordCounts = pairs.reduceByKey(_+_) wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@50d02417 scala> wordCounts.print() scala> ssc.start() 

分别在 99993333 窗口同时输入

hello word hello spark 

可以看到词频统计 union 在了一起
在这里插入图片描述
如果在3s窗口期只输入一次数据,那么只会统计一次词频
在这里插入图片描述


注: receiver也可以先分开写再 union

val lines1 = ssc.socketTextStream("localhost",9999) val lines2 = ssc.socketTextStream("localhost",3333) val lines = lines1.union(lines2) 

三、设置窗口的时长与步长统计(一个receiver一个socket)

3.1 测试源端口

  1. 测试端口1: 9999
nc -lk 9999 

3.2 spark streaming 接收处理数据

spark shell 命令行:

  • 设置窗口时间30s,移动长度10s reduceByKeyAndWindow(x:Int,y:Int) => x+y,Seconds(30),Seconds(10))
sc.stop import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setMaster("local[2]").setAppName("workWC3") val ssc = new StreamingContext(conf,Seconds(5)) val lines = ssc.socketTextStream("localhost",9999) val wcStream = lines.flatMap(_.split(" ")).map(x => (x,1)).reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(30),Seconds(10)) wcStream.print() ssc.start 

在 9999 端口输入:

hello word hello spark 

可以看到 虽然spark streaming context 对象设置了窗口期为5秒,但是 reduceByKeyAndWindow 设置了窗口的移动不出为10秒,这里会以美10秒一个步长显示统计词频
在这里插入图片描述
在这里插入图片描述


四、读取hdfs上的数据实现WordCount

  1. hdfs上创建文件夹
hdfs dfs -mkdir -p /SparkStreaming/test 
  1. 创建测试类: HDFSInputDStreamDemo.scala
package pr import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{ 
     Seconds, StreamingContext} object HDFSInputDStreamDemo extends App{ 
      val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("hdfsDemo") val ssc = new StreamingContext(conf, Seconds(5)) //todo 创建一个输入流,读取文件系统上的数据(文件在hdfs中需要是新放入的,已经存在文件夹中的文件是不会读的) val lines: DStream[String] = ssc.textFileStream("hdfs://hadoop-single:9000/SparkStreaming/test") val wordcounts: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) wordcounts.print() ssc.start() ssc.awaitTermination() } 
  1. 启动程序
  2. 将文本数据放至hdfs指定文件夹中:
    注意: 文件在hdfs中需要是新放入的,已经存在文件夹中的文件是不会读的

wordCount.txt

hello world hello people hello java hadoop scala spark hive java spark hive 

上传文件:

hdfs dfs -put wordCounts.txt /SparkStreaming/test 
  1. IDEA控制台打印结果如下:
    在这里插入图片描述

五、两个InputDStream join连接

  1. 创建测试类: JoinDemo.scala
package pr import org.apache.spark.{ 
     SparkConf, SparkContext} import org.apache.spark.streaming.{ 
     Seconds, StreamingContext} object JoinDemo extends App { 
      val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JoinDemo") val ssc = new StreamingContext(conf,Seconds(5)) val sc: SparkContext = ssc.sparkContext val input1 = List((1, true), (2, false), (3, false), (4, true), (5, false)) val input2 = List((1, false), (2, false), (3, true), (4, true), (5, true)) val rdd1 = sc.parallelize(input1) val rdd2 = sc.parallelize(input2) import scala.collection.mutable //todo 通过queueStream的方式创建InputDStream,一般测试的环境下使用 val ds1 = ssc.queueStream[(Int, Boolean)](mutable.Queue(rdd1)) val ds2 = ssc.queueStream[(Int, Boolean)](mutable.Queue(rdd2)) val ds = ds1.join(ds2) ds.print() ssc.start() ssc.awaitTerminationOrTimeout(5000) } 
  1. 运行程序
  2. IDEA控制台打印结果如下:
    在这里插入图片描述



版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/233128.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Java获取客户端IP[通俗易懂]

    转载地址:Java获取客户端IP 在开发工作中,我们常常需要获取客户端的IP。一般获取客户端的IP地址的方法是:request.getRemoteAddr();但是在通过了Apache,Squid等反向代理软件就不能获取到客户端的真实IP地址了。原因:由于在客户端和服务之间增加了中间代理,因此服务器无法直接拿到客户端的IP,服务器端应用也无法直接通过转发请求的地址返回给客户端。现在图示代理上网和I

    2022年2月25日
    37
  • dh参数建模_data vault 建模

    dh参数建模_data vault 建模仅供个人学习记录DH法一般用一次就丢,然后后面再需要用的时候就会忘,所以本文整理了DH建模法,方便需要使用的时候进行参考DH法可分成以下几步:辨认出关节和连杆确定Z轴确定每个坐标系的原点确定XY轴确定Toolframe写出DH参数表写出转换方程其中对于第二步情况也要分为两种旋转关节平动关节其中对于第四步情况分为三种:Zi与Zi-1不共面Zi与Zi-1平行…

    2022年9月15日
    3
  • Okio源码分析

    Okio源码分析【参考资料】拆轮子系列:拆Okio

    2022年4月30日
    36
  • ubuntu下rabbitvcs安装后无右键菜单解决办法

    ubuntu下rabbitvcs安装后无右键菜单解决办法1、sudorabbitvcs2、rabbitvcs3、nautilus-q4、ls-ldxxx/RabbitVCS.logsudochown-R’currentuser’xxx/RabbitVCS.log

    2022年7月18日
    14
  • LaTeX 换行、换页、空白空间[通俗易懂]

    LaTeX 换行、换页、空白空间[通俗易懂]一般来说,我们不推荐你改变默认的LaTeX文档结构。当然,我们有时候也有这个需求。所以,在本文中,我们将解释如何在文档中插入空行,以及插入任意的空白。

    2022年5月14日
    148
  • bytebuf详解_byte int

    bytebuf详解_byte int@author鲁伟林记录《Netty实战》中各章节学习过程,写下一些自己的思考和总结,帮助使用Netty框架的开发技术人员们,能够有所得,避免踩坑。本博客目录结构将严格按照书本《Netty实战》,省略与Netty无关的内容,可能出现跳小章节。本博客中涉及的完整代码:GitHub地址:https://github.com/thinkingfioa/netty-learning/tre…

    2022年9月19日
    4

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号