kafka topicPartitions问题

kafka topicPartitions问题当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息现象如下:2019-06-1115:30:02.516[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]INFOcom.example.kafka.consumer.TestB-==[cousumerC…

大家好,又见面了,我是你们的朋友全栈君。

当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息

现象如下:

2019-06-11 15:30:02.516 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238202513====sendTest3===3
2019-06-11 15:30:02.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238202513====sendTest3===3
2019-06-11 15:30:02.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238202513====sendTest3===3
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238204513====sendTest3===4

消费者

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerA(String msg) { 
   
    logger.info("==[cousumerA]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerB(String msg) { 
   
    logger.info("==[cousumerB]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerC(String msg) { 
   
    logger.info("==[cousumerC]==" + msg);
}

生产者


    @Autowired
    KafkaTemplate kafkaTemplate;

    int i =0;
    @Scheduled(fixedRate = 2000)
    public void sendTest3() { 
   
        kafkaTemplate.send("hzl.test.aaa", System.currentTimeMillis() + "====sendTest3===" + i++);
    }

如果采用如下方式,则只会被消费一次

    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerD(String msg) { 
   
        logger.info("==[cousumerD]==" + msg);
    }



    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerE(String msg) { 
   
        logger.info("==[cousumerE]==" + msg);
    }


    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerF(String msg) { 
   
        logger.info("==[cousumerF]==" + msg);
    }

另外,如何监听topic中最新的消息

auto-offset-reset: latest 

这样设置好像也是从消费组中提交后的offset开始消费的,并不是最新的一条消息?

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/153317.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 最新QT下载和安装 指南教程「建议收藏」

    最新QT下载和安装 指南教程「建议收藏」原文地址:http://c.biancheng.net/view/3851.htmlQt体积很大,有1GB~3GB,官方下载通道非常慢,相信很多读者会崩溃,所以建议大家使用国内的镜像网站(较快),或者使用迅雷下载(很快)。作为Qt下载教程,本文会同时讲解以上三种下载方式。Qt官方下载(非常慢)Qt官网有一个专门的资源下载网站,所有的开发环境和相关工具都可以从这里下载,具体地址是:http://download.qt.io/图1:Qt官方下载网站截图对目录结构的…

    2022年5月17日
    37
  • springcloud 架构图「建议收藏」

    springcloud 架构图「建议收藏」

    2022年6月8日
    33
  • 数字信号处理课程实验报告(数字信号处理需要什么基础)

    问题重述 DSP课程实验计算机模拟产生多频率信号:编写通用的FFT子程序 设置参数,对信号进行频谱分析 对信号分别以满足和不满足奈奎斯特采样定理的采样率进行采样,观察其频谱变化 设计低通、高通、带通和带阻滤波器,对多频率信号进行滤波处理 撰写实验报告,内容包括实验步骤、流程图、源程序、设置参数、输出结果(图)、结果分析(结合原理)例如:模拟信号:用一个FFT处理…

    2022年4月15日
    135
  • 用matlab画一元二次函数图像_matlab绘制二元函数的三维图

    用matlab画一元二次函数图像_matlab绘制二元函数的三维图二元函数可以用mesh或者surf函数画图。下面举例说明:[X,Y]=meshgrid(-8:.5:8);Z=sqrt(X.^2+Y.^2);mesh(X,Y,Z)图像如下:觉得有帮助就采纳吧www.mh456.com防采集。^subplot221fimplicit3(@5261(x,y,z)x.^41022+y.^2-z)title(‘z=x^2+1653y^2’)subplot222…

    2025年9月24日
    4
  • 此工作站和主域间的信任失败原因_电脑域改为工作组后无法登录

    此工作站和主域间的信任失败原因_电脑域改为工作组后无法登录Thedirectoryserverfailedtoautomaticallyupdateserviceaccount,dnsnameand/orportinformation.这个错误通常是由于访问的主机不能再确保可以和当前加入的活动目录域进行安全通信造成的。当前主机的私有安全凭据和域控制器中的值不匹配。当然简单的可以把安全凭据理解为密码,实际上你知道域环境通过非常严格Kerberos验证,因此实际是Kerberos的Keytable的加密存储在本地安全授权子系统中;

    2022年10月19日
    2
  • Google资深工程师深度讲解Go语言-函数式编程(六)

    Google资深工程师深度讲解Go语言-函数式编程(六)

    2022年2月16日
    46

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号