kafka topicPartitions问题

kafka topicPartitions问题当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息现象如下:2019-06-1115:30:02.516[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]INFOcom.example.kafka.consumer.TestB-==[cousumerC…

大家好,又见面了,我是你们的朋友全栈君。

当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息

现象如下:

2019-06-11 15:30:02.516 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238202513====sendTest3===3
2019-06-11 15:30:02.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238202513====sendTest3===3
2019-06-11 15:30:02.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238202513====sendTest3===3
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238204513====sendTest3===4

消费者

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerA(String msg) { 
   
    logger.info("==[cousumerA]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerB(String msg) { 
   
    logger.info("==[cousumerB]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerC(String msg) { 
   
    logger.info("==[cousumerC]==" + msg);
}

生产者


    @Autowired
    KafkaTemplate kafkaTemplate;

    int i =0;
    @Scheduled(fixedRate = 2000)
    public void sendTest3() { 
   
        kafkaTemplate.send("hzl.test.aaa", System.currentTimeMillis() + "====sendTest3===" + i++);
    }

如果采用如下方式,则只会被消费一次

    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerD(String msg) { 
   
        logger.info("==[cousumerD]==" + msg);
    }



    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerE(String msg) { 
   
        logger.info("==[cousumerE]==" + msg);
    }


    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerF(String msg) { 
   
        logger.info("==[cousumerF]==" + msg);
    }

另外,如何监听topic中最新的消息

auto-offset-reset: latest 

这样设置好像也是从消费组中提交后的offset开始消费的,并不是最新的一条消息?

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/153317.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • phpstorm 2021.5.2 激活码【在线注册码/序列号/破解码】「建议收藏」

    phpstorm 2021.5.2 激活码【在线注册码/序列号/破解码】,https://javaforall.net/100143.html。详细ieda激活码不妨到全栈程序员必看教程网一起来了解一下吧!

    2022年3月17日
    45
  • matlab极性电容叫什么,什么是无极性电容[通俗易懂]

    描述无极电容就是没有极性电源正负极的电容器,无极性电容器的两个电极可以在电路中随意的接入。因为这款电容不存在漏电的现象,主要应用在耦合,退耦,反馈,补偿,振荡等电路中。图为无极性电容参考图。无极电容就是没有极性电源正负极的电容器,无极性电容器的两个电极可以在电路中随意的接入。因为这款电容不存在漏电的现象,主要应用在耦合,退耦,反馈,补偿,振荡等电路中。无极性电容两电极无正负极性之分,主要用于交流电…

    2022年4月12日
    49
  • 如何卸载干净JAVA?「建议收藏」

    如何卸载干净JAVA?「建议收藏」有很多小伙伴下载了JAVA的JDK(java开发工具包)并安装成功运行后,发现自己下错了版本。凉了,半天白搞了。卸载之后又发现在再安装出现安装不了的问题。这往往是因为JAVA并没有卸载完全。今天我们就看看如何完全卸载JAVA。JAVA卸载有两种方式。手动和用JAVA卸载工具。第一种,手动。1.打开控制面板,找到卸载程序,在找到java的程序,并卸载。2.这样之后,java虽然看不见了。但是还没有卸载干净。打开命令行窗口,输入命令regited。打开注册表窗口,删除ja…

    2022年5月19日
    36
  • HashMap与ConcurrentHashMap的区别「建议收藏」

    HashMap与ConcurrentHashMap的区别「建议收藏」从JDK1.2起,就有了HashMap,正如前一篇文章所说,HashMap不是线程安全的,因此多线程操作时需要格外小心。在JDK1.5中,伟大的DougLea给我们带来了concurrent包,从此Map也有安全的了。ConcurrentHashMap具体是怎么实现线程安全的呢,肯定不可能是每个方法加synchronized,那样就变成了HashTable。从Conc

    2022年6月24日
    20
  • UiAutomator Android 的自动测试框架(基础)「建议收藏」

    UiAutomator Android 的自动测试框架(基础)「建议收藏」</pre>很久没更新博客了,今天至后期的一段时间将带给大家的是<spanstyle=”font-family:微软雅黑;font-size:14px;line-height:21px;widows:auto;”>UiAutomatorandroid的自动测试框架,一系列的介绍,希望大家喜欢。</span><p></p…

    2022年10月18日
    0
  • matlab超前滞后校正装置设计_matlab劳斯判据

    matlab超前滞后校正装置设计_matlab劳斯判据且1引言不确定性与时滞是工业过程中普通存在的现象,这使得系统的分析与综合变得更加复杂和困难,同时也是导致系统不稳定和性能恶化的主要因素.因此,对不确定时滞系统的鲁棒控制问题进行研究,具有重要的理论意义和实际应用价值.近年来,不确定时滞系统的稳定性研究得到了广泛的关注匡5}.中立时滞系统作为一类非常重要的控制系统,其稳定性研究己有不少有价值的结论{3一11}.中立时滞系统的稳定条件可分为两大类:时滞…

    2022年9月30日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号