kafka topicPartitions问题

kafka topicPartitions问题当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息现象如下:2019-06-1115:30:02.516[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]INFOcom.example.kafka.consumer.TestB-==[cousumerC…

大家好,又见面了,我是你们的朋友全栈君。

当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息

现象如下:

2019-06-11 15:30:02.516 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238202513====sendTest3===3
2019-06-11 15:30:02.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238202513====sendTest3===3
2019-06-11 15:30:02.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238202513====sendTest3===3
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238204513====sendTest3===4

消费者

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerA(String msg) { 
   
    logger.info("==[cousumerA]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerB(String msg) { 
   
    logger.info("==[cousumerB]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerC(String msg) { 
   
    logger.info("==[cousumerC]==" + msg);
}

生产者


    @Autowired
    KafkaTemplate kafkaTemplate;

    int i =0;
    @Scheduled(fixedRate = 2000)
    public void sendTest3() { 
   
        kafkaTemplate.send("hzl.test.aaa", System.currentTimeMillis() + "====sendTest3===" + i++);
    }

如果采用如下方式,则只会被消费一次

    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerD(String msg) { 
   
        logger.info("==[cousumerD]==" + msg);
    }



    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerE(String msg) { 
   
        logger.info("==[cousumerE]==" + msg);
    }


    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerF(String msg) { 
   
        logger.info("==[cousumerF]==" + msg);
    }

另外,如何监听topic中最新的消息

auto-offset-reset: latest 

这样设置好像也是从消费组中提交后的offset开始消费的,并不是最新的一条消息?

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/153317.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 基于1DCNN(一维卷积神经网络)的机械振动故障诊断

    基于1DCNN(一维卷积神经网络)的机械振动故障诊断基于1DCNN(一维卷积神经网络)的机械振动故障诊断机械振动故障诊断最为经典的还是凯斯西储实验室的轴承故障诊断,开学一周了,上次改编鸢尾花分类的代码可用,但是并不准确。开学一周重新改编了别人的一篇代码,亲测好用。不多咧咧直接放上去(基于Tensorflow2.0)(Spyder4软件上跑的)数据集时本人把凯西轴承实验驱动端内圈损坏尺寸0.14和0.21做的二分类,数据集中0代表的0.14而1代表的0.21具体看下面最后#-*-coding:utf-8-*-“””CreatedonTue

    2022年6月8日
    97
  • 有线如何通过笔记本无线共享上网_笔记本插网线怎么共享WiFi

    有线如何通过笔记本无线共享上网_笔记本插网线怎么共享WiFi半年前搬到新住的地方,由于条件限制,房间就一根网线,我跟我老婆两人晚上都需要用到电脑,于是萌生了通过路由器来达到多机器同时上网,不过最后失败。当时没想起大学时代经常干的事情(学校寝室上网要账号,账号都要钱的):通过一台电脑上网,所有其他电脑都通过这台机器上网。半年之后的今天,由于工作实在需要网络,这种条件下,让我想起了曾经的这么一回事,因此我在想,既然能通过有线达到共享网络,无线原理应该一样吧…

    2025年6月3日
    0
  • Okio源码分析

    Okio源码分析【参考资料】拆轮子系列:拆Okio

    2022年4月30日
    32
  • DropDownList绑定数据源的方法[通俗易懂]

    DropDownList绑定数据源的方法[通俗易懂]web       DropDownList绑定数据源的几种方式 第一种                this.ddltype.DataTextField= “btName”;//显示的值                this.ddltype.DataValueField=”btId”;//获取dropdownlist中的值                ddltype.

    2022年10月8日
    0
  • 【CEGUI】CEGUI入门篇之创建window(四)

    【CEGUI】CEGUI入门篇之创建window(四)以下内容翻译自http://static.cegui.org.uk/docs/0.8.7/window_tutorial.html这里介绍CEGUIwindow的创建及如何让window在屏幕上显示出来,在此之前,需要了解“CEGUI入门篇之初始化(一)”、“CEGUI入门篇之使用ResourceProvider加载资源(二)”和“CEGUI入门篇之数据文件及默认初始化(三)”。1、window和

    2022年7月23日
    10
  • androidstudio 优化gradle编译效率[通俗易懂]

    androidstudio 优化gradle编译效率

    2022年1月24日
    276

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号