SpringBoot集成kafka测试

SpringBoot集成kafka测试本文是 SpringBoot Kafka 的实战讲解 如果对 kafka 的架构原理还不了解的读者 建议先看一下 大白话 kafka 架构原理 秒懂 kafkaHA 高可用 两篇文章 一 生产者实践 普通生产者 带回调的生产者 自定义分区器 kafka 事务提交 二 消费者实践 简单消费 指定 topic partition offset 消费 批量消费 监听异常处理器 消息过滤器 消息转发 定时启动 停止监听

本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下《大白话kafka架构原理》《秒懂kafka HA(高可用)》两篇文章。

一、生产者实践

  • 普通生产者
  • 带回调的生产者
  • 自定义分区器
  • kafka事务提交

二、消费者实践

  • 简单消费
  • 指定topic、partition、offset消费
  • 批量消费
  • 监听异常处理器
  • 消息过滤器
  • 消息转发
  • 定时启动/停止监听器

一、前戏

1、在项目中连接kafka,因为是外网,首先要开放kafka配置文件中的如下配置(其中IP为公网IP),

advertised.listeners=PLAINTEXT://112.126.74.249:9092

2、在开始前我们先创建两个topic:topic1、topic2,其分区和副本数都设置为2,用来测试

  1. [root@iZ2zegzlkedbo3e64vkbefZ ~]# cd /usr/local/kafka-cluster/kafka1/bin/
  2. [root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh –create –zookeeper 172.17.80.219:2181 –replication-factor 2 –partitions 2 –topic topic1
  3. Created topic topic1.
  4. [root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh –create –zookeeper 172.17.80.219:2181 –replication-factor 2 –partitions 2 –topic topic2
  5. Created topic topic2.

当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send(“topic1”, normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下,

   

import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaInitialConfiguration { // 创建一个名为testtopic的Topic并设置分区数为3,分区副本数为2 @Bean public NewTopic initialTopic() { return new NewTopic("testtopic",3, (short) 2 ); } //如果要修改分区数,只需要修改配置重启项目即可 //修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小 @Bean public NewTopic updateTopic() { return new NewTopic("testtopic",5, (short) 2 ); } }

3、新建SpringBoot项目

① 引入pom依赖

 
   
   
     org.springframework.kafka 
    
   
     spring-kafka 
    
  

② application.propertise配置(本文用到的配置项这里全列了出来) 

【Kafka集群】 #spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093 spring.kafka.bootstrap-servers=192.168.2.188:9092 【初始化生产者配置】 # 重试次数 spring.kafka.producer.retries=0 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) spring.kafka.producer.acks=1 # 批量大小 spring.kafka.producer.batch-size=16384 # 提交延时 spring.kafka.producer.properties.linger.ms=0 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了 ​ # 生产端缓冲区大小 spring.kafka.producer.buffer-memory =  # Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 自定义分区器 # spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner ​ 【初始化消费者配置】 # 默认的消费组ID spring.kafka.consumer.properties.group.id=defaultConsumerGroup # 是否自动提交offset spring.kafka.consumer.enable-auto-commit=true # 提交offset延时(接收到消息后多久提交offset) spring.kafka.consumer.auto.commit.interval.ms=1000 # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; spring.kafka.consumer.auto-offset-reset=latest # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) spring.kafka.consumer.properties.session.timeout.ms= # 消费请求超时时间 spring.kafka.consumer.properties.request.timeout.ms= # Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 消费端监听的topic不存在时,项目启动会报错(关掉) spring.kafka.listener.missing-topics-fatal=false # 设置批量消费 # spring.kafka.listener.type=batch # 批量消费每次最多消费多少条消息 # spring.kafka.consumer.max-poll-records=50 

二、Hello Kafka

1、简单生产者

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaProducer { @Autowired private KafkaTemplate 
  
    kafkaTemplate; // 发送消息 @GetMapping("/kafka/normal/{message}") public void sendMessage1(@PathVariable("message") String normalMessage) { kafkaTemplate.send("topic1", normalMessage); } } 
  

 2、简单消费

import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic1"}) public void onMessage1(ConsumerRecord 
   record){ // 消费的哪个topic、partition的消息,打印出消息内容 System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); } }

上面示例创建了一个生产者,发送消息到topic1,消费者监听topic1消费消息。监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。启动项目,postman调接口触发生产者发送消息,

SpringBoot集成kafka测试

可以看到监听器消费成功,

SpringBoot集成kafka测试

三、生产者

1、带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

 @GetMapping("/kafka/callbackOne/{message}") public void sendMessage2(@PathVariable("message") String callbackMessage) { kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> { // 消息发送到的topic String topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset); }, failure -> { System.out.println("发送消息失败:" + failure.getMessage()); }); }

 

@GetMapping("/kafka/callbackTwo/{message}") public void sendMessage3(@PathVariable("message") String callbackMessage) { kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback 
  
    >() { @Override public void onFailure(Throwable ex) { System.out.println("发送消息失败:"+ex.getMessage()); } @Override public void onSuccess(SendResult 
   
     result) { System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-" + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset()); } }); } 
    
  

2、自定义分区器

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;

② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;

③  patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;

※ 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,

import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class CustomizePartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 自定义分区规则(这里假设全部发到0号分区) // ...... return 0; } @Override public void close() { } @Override public void configure(Map 
  
    configs) { } } 
  

在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名,

   

# 自定义分区器 spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

3、kafka事务提交

如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务,

 
  1. @GetMapping(“/kafka/transaction”)
  2. public void sendMessage7(){
  3. // 声明事务:后面报错消息不会发出去
  4. kafkaTemplate.executeInTransaction(operations -> {
  5. operations.send(“topic1″,”test executeInTransaction”);
  6. throw new RuntimeException(“fail”);
  7. });
  8. // 不声明事务:后面报错但前面消息已经发送成功了
  9. kafkaTemplate.send(“topic1″,”test executeInTransaction”);
  10. throw new RuntimeException(“fail”);
  11. }
  • 1

四、消费者

1、指定topic、partition、offset消费

前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供,

 
  1. /
  2. * @Title 指定topic、partition、offset消费
  3. * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 “0号和1号” 分区,指向1号分区的offset初始值为8
  4. * @Author long.yuan
  5. * @Date 2020/3/22 13:38
  6. * @Param [record]
  7. * @return void
  8. /
  9. @KafkaListener(id = “consumer1”,groupId = “felix-group”,topicPartitions = {
  10. @TopicPartition(topic = “topic1”, partitions = { “0” }),
  11. @TopicPartition(topic = “topic2”, partitions = “0”, partitionOffsets = @PartitionOffset(partition = “1”, initialOffset = “8”))
  12. })
  13. public void onMessage2(ConsumerRecord
    record) {
  14. System.out.println(“topic:”+record.topic()+”|partition:”+record.partition()+”|offset:”+record.offset()+”|value:”+record.value());
  15. }
  • 1

属性解释:

① id:消费者ID;

② groupId:消费组ID;

③ topics:监听的topic,可监听多个;

④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。

注意:topics和topicPartitions不能同时使用;

2、批量消费

设置application.prpertise开启批量消费即可,

 
  1. # 设置批量消费
  2. spring.kafka.listener.type=batch
  3. # 批量消费每次最多消费多少条消息
  4. spring.kafka.consumer.max-poll-records=50
  • 1

接收消息时用List来接收,监听代码如下,

 
  1. @KafkaListener(id = “consumer2”,groupId = “felix-group”, topics = “topic1”)
  2. public void onMessage3(List

    > records) {

  3. System.out.println(“>>>批量消费一次,records.size()=”+records.size());
  4. for (ConsumerRecord
    record : records) {
  5. System.out.println(record.value());
  6. }
  7. }
  • 1

3、ConsumerAwareListenerErrorHandler 异常处理器

通过异常处理器,我们可以处理consumer在消费时发生的异常。

新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,

 
  1. // 新建一个异常处理器,用@Bean注入
  2. @Bean
  3. public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
  4. return (message, exception, consumer) -> {
  5. System.out.println(“消费异常:”+message.getPayload());
  6. return null;
  7. };
  8. }
  9. // 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
  10. @KafkaListener(topics = {“topic1”},errorHandler = “consumerAwareErrorHandler”)
  11. public void onMessage4(ConsumerRecord
    record) throws Exception {
  12. throw new Exception(“简单消费-模拟异常”);
  13. }
  14. // 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
  15. @KafkaListener(topics = “topic1″,errorHandler=”consumerAwareErrorHandler”)
  16. public void onMessage5(List

    > records) throws Exception {

  17. System.out.println(“批量消费一次…”);
  18. throw new Exception(“批量消费-模拟异常”);
  19. }
  • 1

执行看一下效果,

SpringBoot集成kafka测试

4、消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

 
  1. @Component
  2. public class KafkaConsumer {
  3. @Autowired
  4. ConsumerFactory consumerFactory;
  5. // 消息过滤器
  6. @Bean
  7. public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
  8. ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
  9. factory.setConsumerFactory(consumerFactory);
  10. // 被过滤的消息将被丢弃
  11. factory.setAckDiscarded(true);
  12. // 消息过滤策略
  13. factory.setRecordFilterStrategy(consumerRecord -> {
  14. if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
  15. return false;
  16. }
  17. //返回true消息则被过滤
  18. return true;
  19. });
  20. return factory;
  21. }
  22. // 消息过滤监听
  23. @KafkaListener(topics = {“topic1”},containerFactory = “filterContainerFactory”)
  24. public void onMessage6(ConsumerRecord
    record) {
  25. System.out.println(record.value());
  26. }
  27. }
  • 1

上面实现了一个”过滤奇数、接收偶数”的过滤策略,我们向topic1发送0-99总共100条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数,

SpringBoot集成kafka测试

5、消息转发

在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下,

 
  1. /
  2. * @Title 消息转发
  3. * @Description 从topic1接收到的消息经过处理后转发到topic2
  4. * @Author long.yuan
  5. * @Date 2020/3/23 22:15
  6. * @Param [record]
  7. * @return void
  8. /
  9. @KafkaListener(topics = {“topic1”})
  10. @SendTo(“topic2”)
  11. public String onMessage7(ConsumerRecord
    record) {
  12. return record.value()+”-forward message”;
  13. }
  • 1

6、定时启动、停止监听器

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

① 禁止监听器自启动;

② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;

新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动,

 
  1. @EnableScheduling
  2. @Component
  3. public class CronTimer {
  4. /
  5. * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
  6. * 而是会被注册在KafkaListenerEndpointRegistry中,
  7. * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
  8. /
  9. @Autowired
  10. private KafkaListenerEndpointRegistry registry;
  11. @Autowired
  12. private ConsumerFactory consumerFactory;
  13. // 监听器容器工厂(设置禁止KafkaListener自启动)
  14. @Bean
  15. public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
  16. ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
  17. container.setConsumerFactory(consumerFactory);
  18. //禁止KafkaListener自启动
  19. container.setAutoStartup(false);
  20. return container;
  21. }
  22. // 监听器
  23. @KafkaListener(id=”timingConsumer”,topics = “topic1”,containerFactory = “delayContainerFactory”)
  24. public void onMessage1(ConsumerRecord
    record){
  25. System.out.println(“消费成功:”+record.topic()+”-“+record.partition()+”-“+record.value());
  26. }
  27. // 定时启动监听器
  28. @Scheduled(cron = “0 42 11 * * ? “)
  29. public void startListener() {
  30. System.out.println(“启动监听器…”);
  31. // “timingConsumer”是@KafkaListener注解后面设置的监听器ID,标识这个监听器
  32. if (!registry.getListenerContainer(“timingConsumer”).isRunning()) {
  33. registry.getListenerContainer(“timingConsumer”).start();
  34. }
  35. //registry.getListenerContainer(“timingConsumer”).resume();
  36. }
  37. // 定时停止监听器
  38. @Scheduled(cron = “0 45 11 * * ? “)
  39. public void shutDownListener() {
  40. System.out.println(“关闭监听器…”);
  41. registry.getListenerContainer(“timingConsumer”).pause();
  42. }
  43. }
  • 1

启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作,

SpringBoot集成kafka测试

11:42分监听器启动开始工作,消费消息,

SpringBoot集成kafka测试

SpringBoot集成kafka测试

11:45分监听器停止工作,

SpringBoot集成kafka测试

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/177483.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月26日 下午7:24
下一篇 2026年3月26日 下午7:25


相关推荐

  • vsftp账号_Vsftp用户限制

    vsftp账号_Vsftp用户限制背景Oracle全库备份,异地备份在实现异地备份后,由第三方人员登录服务器拉取dmp文件.为了确保安全,创建一个特定ftp账号用于第三方人员使用要求1.可以登录服务器2.可以拉取dmp文件3.仅限在dmp文件的目录下,不能cd其他路径,ls其他目录解决过程yum安装ftp服务[root@78778e06dc0a/]#yuminstallvsftpd-y修改vsftp配置文件,开启限制[…

    2025年7月20日
    7
  • js常用跳转代码[通俗易懂]

    js常用跳转代码[通俗易懂]今天整理下我们常用的js跳转代码,常用的js跳转代码比如:js倒计时跳转代码、pc站跳转手机站代码、在原来的窗体中直接js跳转用、在新窗体中打开页面用、js直接跳转代码、js返回代码、head标签内加如下代码实现页面定时自动跳转代码、self.location方式实现页面跳转、top.loca…

    2022年8月13日
    6
  • utf8在mysql占几个字符_utf-8的中文,一个字符占几个字节「建议收藏」

    utf8在mysql占几个字符_utf-8的中文,一个字符占几个字节「建议收藏」https://blog.csdn.net/kindsuper_liu/article/details/80202150英文字母和中文汉字在不同字符集编码下的字节数英文字母:·字节数:1;编码:GB2312字节数:1;编码:GBK字节数:1;编码:GB18030字节数:1;编码:ISO-8859-1字节数:1;编码:UTF-8字节数:4;编码:UTF-16字节数:2;编…

    2022年6月26日
    32
  • php(ThinkPHP)实现微信小程序的登录过程

    php(ThinkPHP)实现微信小程序的登录过程

    2021年10月13日
    75
  • js中的闭包[通俗易懂]

    js中的闭包[通俗易懂]闭包是js的一个难点也是它的一个特色,是我们必须掌握的js高级特性,那么什么是闭包呢?它又有什么用呢?我们都知道,js的作用域分两种,全局和局部,基于我们所熟悉的作用域链相关知识,我们知道在js作用域环境中访问变量的权利是由内向外的,内部作用域可以获得当前作用域下的变量并且可以获得当前包含当前作用域的外层作用域下的变量,反之则不能,也就是说在外层作用域下无法获取内层作用域下的变量,同样在不同的函…

    2022年6月25日
    29
  • 分子模拟软件amber_容天AMBER优化的GPU解决方案

    分子模拟软件amber_容天AMBER优化的GPU解决方案AMBER认证的GPU系统AMBER认证GPU系统提供商容天更快地运行MD仿真容天与AMBER的主要开发商合作开发了交钥匙解决方案,为GPU加速的生物分子模拟提供增值系统。经过验证的系统,每个用户的CPU,GPU,内存和存储具有适当的平衡。从工作站到超级计算机的高度可扩展系统,具有3年保修和支持。容天AMBER优化的GPU加速系统更快地完成MD仿真并不需要花费很多。我们的AMBERG…

    2022年5月9日
    130

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号