Spark Streaming 实现 word count

Spark Streaming 实现 word countSparkStreami 实现 wordcount 一 一个输入源端口对应一个 receiver1 1 数据源端口 1 2sparkstream 接收处理数据二 两个输入源端口对应一个 receiver2 1 测试源端口一 一个输入源端口对应一个 receiver1 1 数据源端口使用网络猫作为数据的输入源端口下载网络猫 linux 命令行执行 yuminstall ync 使用测试端口 9999nc lk99991 2sparkstream 接收处理数据这

一、一个receiver接收一个输入源端口

1.1 数据源端口

yum install -y nc 

使用测试端口: 9999

nc -lk 9999 

1.2 spark streaming 接收处理数据

① spark shell 本地模式运行

  • 启动spark shell命令行
spark-shell 

在这里插入图片描述

  1. 导包
scala> import org.apache.spark._ import org.apache.spark._ scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._ 
  1. 创建 spark streaming context对象
    注: 需先停止默认的 Spark context 对象 sc,因为一个JVM线程中只能创建一个 SparkContext对象,默认创建的 Spark context 对象没有定义窗口信息、线程等其他配置项,而需要自定义一些配置项时,就必须要新创建一个 Spark context 对象,设置配置信息,所以要先停止 sc
    local [ n ] 中的 n 要大于receiver 的数量

scala> sc.stop scala> val conf = new SparkConf().setMaster("local[2]").setAppName("workWC1") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@5d08a65c // 设置窗口期时间为3s scala> val ssc = new StreamingContext(conf,Seconds(3)) ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@44a0e68f 
  1. 使用 socketTextStream 连接上端口 9999
scala> val lines = ssc.socketTextStream("localhost",9999) lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@ 
  1. 使用 flatMap(_.split()) 切分展开数据源输入的每条数据
scala> val words = lines.flatMap(_.split(" ")) words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@22f756c5 
  1. 将切分每条数据的每个元素组成元组形式的 (key,1)
scala> val pairs = words.map(word => (word,1)) pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@6a16ee0d 
  1. 使用 reduceByKey(+) 对每个key聚合,统计key的数量
scala> val wordCounts = pairs.reduceByKey(_+_) wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@7b031f53 
  1. 打印输出结果
scala> wordCounts.print() 
  1. 启动 spark streaming
ssc.start() 
  1. 在测试端口 9999 输入测试数据:
hello word hello tom hello jerry hello word 

在这里插入图片描述

  1. 输入 ctrl + D 结束任务

② java高级API运行

  1. 配置pom文件:pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>SparkStreaming-pr</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.7.1</hadoop.version> <scala.compat.version>2.11</scala.compat.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <!--<testSourceDirectory>src/test/scala</testSourceDirectory>--> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter><artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> 
  1. 创建main目录下创建 resources 文件夹,拷贝一份 log4j.properties 文件
    在这里插入图片描述
    设置为资源目录:
    在这里插入图片描述


log4j.properties

# contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Set everything to be logged to the console log4j.rootCategory= WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n 
  1. 创建测试类:SparkStreamingWordCount.scala
package pr import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{ 
   DStream, ReceiverInputDStream} import org.apache.spark.streaming.{ 
   Seconds, StreamingContext} object SparkStreamingWordCount extends App{ 
    //todo 创建一个spark StreamingContext对象 val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("demo1") val ssc = new StreamingContext(conf, Seconds(5)) //todo 使用spark streaming进行word count val inputDstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop-single", 9999) //todo 对输入流进行操作 val wordDstream: DStream[String] = inputDstream.flatMap(_.split(" ")) val wordAndOneDstream: DStream[(String, Int)] = wordDstream.map((_, 1)) val wordcounts: DStream[(String, Int)] = wordAndOneDstream.reduceByKey(_ + _) wordcounts.print() //todo 通过start() 启动消息采集和处理 ssc.start() //todo 等待程序终止 ssc.awaitTermination() } 
  1. 运行程序
  2. 在测试端口 9999 输入测试数据:
hello word hello tom hello jerry hello word 
  1. IDEA控制台打印结果如下:
    在这里插入图片描述

二、两个receiver接收两个输入源端口

2.1 测试源端口

  1. 测试端口1: 9999
nc -lk 9999 
  1. 测试端口2: 3333
nc -lk 3333 

2.2 spark streaming 接收处理数据

spark shell 命令行:

scala> sc.stop scala> import org.apache.spark._ import org.apache.spark._ scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._ scala> val conf = new SparkConf().setMaster("local[3]").setAppName("workWC2") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7d95a717 scala> val ssc = new StreamingContext(conf,Seconds(3)) ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@58d4fe33 // 使用 union 联合两个receiver scala> val lines = ssc.socketTextStream("localhost",9999).union(ssc.socketTextStream("localhost",3333)) lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.UnionDStream@5263f554 scala> val words = lines.flatMap(_.split(" ")) words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@ scala> val pairs = words.map(word => (word,1)) pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@386ec37d scala> val wordCounts = pairs.reduceByKey(_+_) wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@50d02417 scala> wordCounts.print() scala> ssc.start() 

分别在 99993333 窗口同时输入

hello word hello spark 

可以看到词频统计 union 在了一起
在这里插入图片描述
如果在3s窗口期只输入一次数据,那么只会统计一次词频
在这里插入图片描述


注: receiver也可以先分开写再 union

val lines1 = ssc.socketTextStream("localhost",9999) val lines2 = ssc.socketTextStream("localhost",3333) val lines = lines1.union(lines2) 

三、设置窗口的时长与步长统计(一个receiver一个socket)

3.1 测试源端口

  1. 测试端口1: 9999
nc -lk 9999 

3.2 spark streaming 接收处理数据

spark shell 命令行:

  • 设置窗口时间30s,移动长度10s reduceByKeyAndWindow(x:Int,y:Int) => x+y,Seconds(30),Seconds(10))
sc.stop import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setMaster("local[2]").setAppName("workWC3") val ssc = new StreamingContext(conf,Seconds(5)) val lines = ssc.socketTextStream("localhost",9999) val wcStream = lines.flatMap(_.split(" ")).map(x => (x,1)).reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(30),Seconds(10)) wcStream.print() ssc.start 

在 9999 端口输入:

hello word hello spark 

可以看到 虽然spark streaming context 对象设置了窗口期为5秒,但是 reduceByKeyAndWindow 设置了窗口的移动不出为10秒,这里会以美10秒一个步长显示统计词频
在这里插入图片描述
在这里插入图片描述


四、读取hdfs上的数据实现WordCount

  1. hdfs上创建文件夹
hdfs dfs -mkdir -p /SparkStreaming/test 
  1. 创建测试类: HDFSInputDStreamDemo.scala
package pr import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{ 
     Seconds, StreamingContext} object HDFSInputDStreamDemo extends App{ 
      val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("hdfsDemo") val ssc = new StreamingContext(conf, Seconds(5)) //todo 创建一个输入流,读取文件系统上的数据(文件在hdfs中需要是新放入的,已经存在文件夹中的文件是不会读的) val lines: DStream[String] = ssc.textFileStream("hdfs://hadoop-single:9000/SparkStreaming/test") val wordcounts: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) wordcounts.print() ssc.start() ssc.awaitTermination() } 
  1. 启动程序
  2. 将文本数据放至hdfs指定文件夹中:
    注意: 文件在hdfs中需要是新放入的,已经存在文件夹中的文件是不会读的

wordCount.txt

hello world hello people hello java hadoop scala spark hive java spark hive 

上传文件:

hdfs dfs -put wordCounts.txt /SparkStreaming/test 
  1. IDEA控制台打印结果如下:
    在这里插入图片描述

五、两个InputDStream join连接

  1. 创建测试类: JoinDemo.scala
package pr import org.apache.spark.{ 
     SparkConf, SparkContext} import org.apache.spark.streaming.{ 
     Seconds, StreamingContext} object JoinDemo extends App { 
      val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JoinDemo") val ssc = new StreamingContext(conf,Seconds(5)) val sc: SparkContext = ssc.sparkContext val input1 = List((1, true), (2, false), (3, false), (4, true), (5, false)) val input2 = List((1, false), (2, false), (3, true), (4, true), (5, true)) val rdd1 = sc.parallelize(input1) val rdd2 = sc.parallelize(input2) import scala.collection.mutable //todo 通过queueStream的方式创建InputDStream,一般测试的环境下使用 val ds1 = ssc.queueStream[(Int, Boolean)](mutable.Queue(rdd1)) val ds2 = ssc.queueStream[(Int, Boolean)](mutable.Queue(rdd2)) val ds = ds1.join(ds2) ds.print() ssc.start() ssc.awaitTerminationOrTimeout(5000) } 
  1. 运行程序
  2. IDEA控制台打印结果如下:
    在这里插入图片描述



版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/233128.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • java工厂类理解

    java工厂类理解

    2021年7月16日
    72
  • 【STM32】HAL库 STM32CubeMX教程十—DAC「建议收藏」

    【STM32】HAL库 STM32CubeMX教程十—DAC「建议收藏」前言:本系列教程将对应外设原理,HAL库与STM32CubeMX结合在一起讲解,使您可以更快速的学会各个模块的使用所用工具:1、芯片:STM32F407ZET6/STM32F103ZET62、STM32CubeMx软件3、IDE:MDK-Keil软件4、STM32F1xx/STM32F4xxHAL库知识概括:通过本篇博客您将学到:DAC工作原理STM32CubeMX创建…

    2022年5月30日
    63
  • 简易旋转倒立摆_180度旋转气缸调节角度

    简易旋转倒立摆_180度旋转气缸调节角度旋转倒立摆调节经验前言程序框架关于直立关于自动起摆前言近期在做2013年电赛控制类题目–简易旋转倒立摆装置,自己并不是自动化专业的学生,没有学过自动控制原理,倒立摆其实是一个十分经典的自动控制模型,我们只能是边做边学习,逐渐去了解倒立摆。我认为倒立摆有两个难点,一个是自动起摆一个是机械结构,其中自动起摆涉及到PID算法与运动方程的求解,而机械结构主要是尽量减小转动阻尼同时避免旋转时线的缠绕。…

    2022年8月18日
    8
  • ncodeURIComponent() 函数 vue内容

    ncodeURIComponent() 函数 vue内容

    2022年3月1日
    45
  • 公网IP、私网IP、动态IP、静态IP

    公网IP、私网IP、动态IP、静态IPIP地址这个词我们经常听到,它具体是个什么东西这里就不谈了,建议去看一下本系列文章的《IP地址和MAC地址简介》一文。本文主要来区分一下公网IP、私网IP、动态IP、静态IP这四个概念。先说公网IP和私网IP。我们都听过IP地址,但对公网IP和私网IP可能就比较挠头了。把IP的问题放在一边,我们先说一下什么是公网,什么是私网。公网又名广域网、外网,指的就是我们平时说的互联网。私网又名内网、局域网,指的是路由器或交换机下创建的局部互联网络。网络的功能说的简单些就是使多台计算机实现互联,可以互相传输数据。广

    2022年6月7日
    31
  • 【JS字符串方法】JS字符串方法

    【JS字符串方法】JS字符串方法字符串的 ES5 和 ES6 方法 ES5String fromCharCode 该方法的参数是一系列 Unicode 码点 返回对应的字符串 charAt 该方法返回指定位置的字符 参数是从 0 开始编号的位置 charCodeAt 方法返回给定位置字符的 Unicode 码点 十进制表示 相当于 String fromCharCode 的逆操作 concat 方法用于连

    2025年10月28日
    6

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号