Kafka Streams之WordCount

Kafka Streams之WordCount一、实现流程1、注意Kafka中的数据都以<key,value>的形式存在。2、wordCount流程(1)Stream从topic中取出每一条数据记录(<key,value>格式):<null,”Sparkandspark”>(2)MapValue将value中所有文本转换成小写形式:<null,”sparkan…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

一、实现流程

1、注意

Kafka中的数据都以<key, value>的形式存在。

2、wordCount流程

(1)Stream 从topic中取出每一条数据记录 (<key, value>格式): <null, “Spark and spark”>

(2)MapValue 将value中所有文本转换成小写形式:<null, “spark and spark”>

(3)FlatMapValues 按空格分解成单词 :<null, “spark”>,<null, “and”>, <null, “spark”>

(4)SelectKey 将value的值赋给key :<“spark”, “spark”>,<“and”, “and”>, <“spark”, “spark”>

(5)GroupByKey 按相同的Key分组 :(<“spark”, “spark”>, <“spark, “spark”>),(<“and”, “and”>)

(6)Count 计算每个组中元素个数 :<“spark”, 2>,<“and”, 1>

(7)To 将结果返回Kafka

二、代码实现

1、pom依赖

       <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.2</version>
        </dependency>

Jetbrains全家桶1年46,售后保障稳定

2、kafkaStreams主程序

package com.cn.kafkaStreams;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;

public class KafkaStreamsMain {
    public static void main(String[] args) {
        //首先进行配置
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.230.21:6667,192.168.230.22:6667,192.168.230.23:6667");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        //构建KStream
        KStream<String, String> textLines = builder.stream("test_wordCount");

        //得到结果后将其存储为KTable
        KTable<String, Long> wordCounts =
                //将数据记录中的大写全部替换成小写:
                textLines.mapValues(values -> values.toLowerCase())
                //将各行数据按空格拆分
                /**
                 * 由于flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> var1)
                 * key: ? super V
                 * value(属于集合): ? extends Iterable<? extends VR>
                 * 故将数组转化为集合方式:Arrays.asList()
                 */
                .flatMapValues(values -> Arrays.asList(values.split(" ")))
                //将value作为新的key
                .selectKey((key, word) -> word)
                //aggregation操作前group by key:
                .groupByKey()
                //计算每个组中的元素个数
                .count(Materialized.as("Counts"));
                //将KStream写回Kafka,key为String,value为Long。
        wordCounts.toStream().to("test_out", Produced.with(Serdes.String(), Serdes.Long()));
        Topology topology = builder.build();
        //System.out.println(topology.describe());
        KafkaStreams kafkaStreams = new KafkaStreams(topology, config);
        kafkaStreams.start();
    }
}

3、向kafka造数据

package com.cn.kafkaStreams;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class kafkaProducer {
    static String arr[]={"Spark is spark","hbase can save bigdata","hive can select data"};
    static int p= -1;
    public static String getWord(){
        p=(p+1)%arr.length;
        return arr[p];
    }

    public static void main(String[] args) {
        String topic = "test_wordCount";
        String brokers = "192.168.230.21:6667,192.168.230.22:6667,192.168.230.23:6667";
        //设置属性,配置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", brokers);
        props.setProperty("metadata.broker.list", brokers);
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //生成producer对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //传输数据
        while (true) {
            String event = getWord();
            System.out.println(event);
            //发送数据
            producer.send(new ProducerRecord<String, String>(topic, event));
            try{
                Thread.sleep(2000);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

4、消费回写kafka的结果

package com.cn.kafkaStreams;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class kafkaConsumerMain {
    public static void main(String[] args) {
        // Kafka consumer configuration settings
        String topicName = "test_out";
        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.230.21:6667,192.168.230.22:6667,192.168.230.23:6667");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        // Kafka Consumer subscribes list of topics here.
        kafkaConsumer.subscribe(Arrays.asList(topicName));

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(5);
            for (ConsumerRecord<String, String> record : records) {
                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
            }
        }

    }
}

三、控制台输出

1、kafkaProducer

...
Spark is spark
hbase can save bigdata
hive can select data
Spark is spark
hbase can save bigdata
hive can select data

...

2、kafkaConsumerMain

...
offset = 32, key = spark, value = 45
offset = 33, key = hbase, value = 40
offset = 34, key = save, value = 82
offset = 35, key = bigdata, value = 40
offset = 36, key = hive, value = 37
offset = 37, key = can, value = 163
offset = 38, key = select, value = 65
offset = 39, key = data, value = 123
offset = 40, key = is, value = 48
offset = 41, key = spark, value = 55
offset = 42, key = hbase, value = 45
offset = 43, key = save, value = 87
offset = 44, key = bigdata, value = 45
offset = 45, key = hive, value = 42
offset = 46, key = can, value = 173
offset = 47, key = select, value = 70
offset = 48, key = data, value = 128
...

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/227640.html原文链接:https://javaforall.net

(0)
上一篇 2025年6月20日 上午11:22
下一篇 2025年6月20日 下午12:01


相关推荐

  • AcWing 2060. 奶牛选美(双端队列BFS)

    AcWing 2060. 奶牛选美(双端队列BFS)题目描述 听说最近两斑点的奶牛最受欢迎 约翰立即购进了一批两斑点牛 不幸的是 时尚潮流往往变化很快 当前最受欢迎的牛变成了一斑点牛 约翰希望通过给每头奶牛涂色 使得它们身上的两个斑点能够合为一个斑点 让它们能够更加时尚 牛皮可用一个 N MN MN M 的字符矩阵来表示 如下所示 XXXX XXX XXXX XX XXXX XXX XXXXX XXX 其中 X 表示斑点

    2025年11月6日
    4
  • 2022保密教育线上培训考试 01[通俗易懂]

    2022保密教育线上培训考试 01[通俗易懂]试题1单选题1.机关、单位应当严格按照经过批准的范围对外提供涉密资料,并与外方签订(),限定涉密资料的使用和知悉范围。正确答案:B.保密协议2.按照公职人员政务处分法有关规定,有()行为造成不良后果或者影响的,予以警告、记过或者记大过;情节较重的,予以降级或者撤职;情节严重的,予以开除。正确答案:D.以上都正确3.下列关于涉密载体销毁的说法错误的是()。正确答案:B.涉密载体销毁的登记、审批记录无须保存4.保密期限是对国家秘密采取保密措施的时间要求。保密期限包括的形式有()。正

    2022年10月1日
    4
  • python执行cmd命令并解析结果_python如何打包成可执行程序

    python执行cmd命令并解析结果_python如何打包成可执行程序身为一个编程小白,最近刚刚接手工作。近期在使用pyinstaller的时候,发现了一个很严重的问题,那就是缺少包。当时并不知道,因为代码已经是上任大佬写好的,我可以直接用。先说一下问题吧(图片忘了保存)。执行文件突然死掉,当时手速够快截了一个图。原因是缺少了这么个包。pkg_resources.py2_warn具体的解决方法如下:顺便说一下,pyinstaller的安装方法,很简单。首先打开你的pycharm,然后点击左下角的Terminal之后输入命令 pipinstallpyins

    2022年8月28日
    7
  • 十进制小数如何转换为二进制小数[通俗易懂]

    十进制小数如何转换为二进制小数[通俗易懂]关于十进制小数转换为二进制数,下面是我的详细操作说明,仅供参考。纯小数首先,最高位是符号位,正数是0,负数是1;小数部分乘以2,然后取整数部分,,剩余小数部分继续乘以2,取整数部分,……直到小数部分为0。以+0.125为例:+数,最高位为0;小数部分0.125×2=0.25,取0;再取小数部分0.25×2=0.5,取0;再取小数部分0.5×2=1.0,取1;这时小数部分是0,结束。…

    2026年3月8日
    11
  • pandas—dropna[通俗易懂]

    pandas—dropna[通俗易懂]文章目录1.pd.Series.dropna官方案例2.pd.DataFrame.dropna官方案例1.pd.Series.dropnaSeries.dropna(axis=0,inplace=False,how=None)描述返回删除了缺失值的新Series参数axis:{0or‘index’},default0只有一个轴可以从中删除值inplace:bool,defaultFalse如果为True,则就地修改返回None如果为False,则

    2025年6月3日
    7
  • 启动磁盘不能被分区或恢复成单个分区-解决了

    启动磁盘不能被分区或恢复成单个分区-解决了1,找到mac自带的“磁盘工具”。2,使用磁盘工具把多余的系统盘选择使用“抹掉”功能。3,抹掉磁盘选格式改为EXFAT格式。4,再一次使用BootCamp就可以恢复了。抹掉后,可以在磁盘分区中直接删除对应windows分区,这样就会合并为一个分区了,可以接着重新安装windows步骤参考文档:此文档少了上面的步骤,所以失败https://blog.csdn.net/weixin_35940949/article/details/112507403安装参考文档https://..

    2022年8月11日
    94

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号