Spring Boot 中使用@KafkaListener并发批量接收消息[通俗易懂]

Spring Boot 中使用@KafkaListener并发批量接收消息[通俗易懂]kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到Kafka消息队列拥堵的情况。碰到这种情况时,有不能直接清理整改消息队列,因为还有别的服务正在使用该队列。因此只能额外启动一个相同名称的consumer-group来加快消息消费(经测试,如果该topic只有一个分区,实际上再启动一个新的消费者作用不到)。具体代码在这里,欢迎加星号,fork。官方文档……

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况。遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic。因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用)。

完整的代码在这里,欢迎加星号、fork。

官方文档在https://docs.spring.io/spring-kafka/reference/html/_reference.html

###第一步,并发消费###
先看代码,重点是这我们使用的是ConcurrentKafkaListenerContainerFactory并且设置了factory.setConcurrency(4); (我的topic有4个分区,为了加快消费将并发设置为4,也就是有4个KafkaMessageListenerContainer)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

注意也可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并发消费。

###第二步,批量消费###
然后是批量消费。重点是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一个设启用批量消费,一个设置批量消费每次最多消费多少条消息记录。

重点说明一下,我们设置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是说如果没有达到50条消息,我们就一直等待。官方的解释是”The maximum number of records returned in a single call to poll().”, 也就是50表示的是一次poll最多返回的记录数。

从启动日志中可以看到还有个 max.poll.interval.ms = 300000, 也就说每间隔max.poll.interval.ms我们就调用一次poll。每次poll最多返回50条记录。

max.poll.interval.ms官方解释是”The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. “;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

启动日志截图
这里写图片描述

关于max.poll.records和max.poll.interval.ms官方解释截图:
这里写图片描述

###第三步,分区消费###
对于只有一个分区的topic,不需要分区消费,因为没有意义。下面的例子是针对有2个分区的情况(我的完整代码中有4个listenPartitionX方法,我的topic设置了4个分区),读者可以根据自己的情况进行调整。

public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}

关于分区和消费者关系,后面会补充,先摘录如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

最后,总结,如果我们的topic有多个分区,经过以上步骤可以很好的加快消息消费。如果只有一个分区,因为已经有一个同名group id在消费了,新启动的一个基本上没有作用(本人测试结果)。

具体代码在这里,欢迎加星号,fork。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/181878.html原文链接:https://javaforall.net

(0)
上一篇 2022年10月15日 上午6:16
下一篇 2022年10月15日 上午6:36


相关推荐

  • 微信小程序跳转传值(微信怎样打开小程序)

    前情:首先我们有这么一种需求,就是我在一个列表中点击了某个item,跳转到详情界面,那么我就需要把item的实体数据从列表页面传递到详情页面,那么我们来看看微信小程序给我们提供的API:先看api:这里大家可以清楚看到api中说到的如何传递参数,其实它这里指的参数仅仅是一些普通的数据类型具体分析:这里我们要传递的实体是object类型,那么我们需要先把实体转…

    2022年4月12日
    54
  • 十八万字《python从零到精通教程》第二版,贴心保姆教你从零变大神,学不会找我「建议收藏」

    文章目录一.pycharm下载安装二.python下载安装三.pycharm上配置python四.配置镜像源让你下载嗖嗖的快4.1pycharm内部配置4.2手动添加镜像源4.3永久配置镜像源五.插件安装(比如汉化?)5.1自动补码神器第一款5.2自动补码神器第二款5.2汉化pycharm5.3其它插件六.美女背景七.自定义脚本开头八、这个前言一定要看九、python入门十、python缩进十一、Python注释1.单行注释2.多行注释十二、Python变量1.变量定义理解2.变量名命名3.分配多个

    2022年4月10日
    39
  • python字符串删除指定符号(不限位置)「建议收藏」

    python字符串删除指定符号(不限位置)「建议收藏」python中去掉字符串中某些不想要的字符:1、一般的可以用replace()这个函数不限定位置,是可以替换原来不想要的字符,替换成空字符就相当于删除了2、也可以用strip(),删除两边的字符(默认是删除左右空格)rstrip(),lstrip()这两个可以选择只删除左边或者右边3、re.sub这个可以根据正则删除,此处是删除串中的数字1-9,字符a-z,A-Z,还可以加其他的importrestr=”aksj2343ngr4545g黄金叶子fg”temp=re.sub(‘[a

    2022年6月4日
    76
  • 百度地图的开发版sha1和发布版sha1的获取方式「建议收藏」

    百度地图的开发版sha1和发布版sha1的获取方式「建议收藏」百度地图SDK在实际开发中也算是最常用的SDK之一,但是不少新手开发者对申请密钥时,填写SHA1有不少疑问,在此解答进入百度地图SDK申请密钥会看到这样的场景这里不得不说几件事在安卓开发中,打包生成APK时,我们通常有两种方式重点:也就是所谓的debug版本和release版本,这两个签名方式是不一样的。调试(

    2022年8月10日
    7
  • 常见DML语句汇总

    常见DML语句汇总DML 操作是指对数据中表记录的操作 主要包括表记录的插入 insert 更新 update 删除 delete 和查询 select 是开发人员日常使用最频繁的操作 下面依次对它们进行介绍 1 插入记录 INSERTINTOta field1 field2 fieldn VALUES value1 vaule2 valuen 2

    2026年3月26日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号