kafka topicPartitions问题

kafka topicPartitions问题当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息现象如下:2019-06-1115:30:02.516[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]INFOcom.example.kafka.consumer.TestB-==[cousumerC…

大家好,又见面了,我是你们的朋友全栈君。

当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息

现象如下:

2019-06-11 15:30:02.516 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238202513====sendTest3===3
2019-06-11 15:30:02.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238202513====sendTest3===3
2019-06-11 15:30:02.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238202513====sendTest3===3
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238204513====sendTest3===4

消费者

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerA(String msg) { 
   
    logger.info("==[cousumerA]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerB(String msg) { 
   
    logger.info("==[cousumerB]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerC(String msg) { 
   
    logger.info("==[cousumerC]==" + msg);
}

生产者


    @Autowired
    KafkaTemplate kafkaTemplate;

    int i =0;
    @Scheduled(fixedRate = 2000)
    public void sendTest3() { 
   
        kafkaTemplate.send("hzl.test.aaa", System.currentTimeMillis() + "====sendTest3===" + i++);
    }

如果采用如下方式,则只会被消费一次

    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerD(String msg) { 
   
        logger.info("==[cousumerD]==" + msg);
    }



    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerE(String msg) { 
   
        logger.info("==[cousumerE]==" + msg);
    }


    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerF(String msg) { 
   
        logger.info("==[cousumerF]==" + msg);
    }

另外,如何监听topic中最新的消息

auto-offset-reset: latest 

这样设置好像也是从消费组中提交后的offset开始消费的,并不是最新的一条消息?

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/153317.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 最大似然估计,最大后验估计,贝叶斯估计联系与区别

    最大似然估计,最大后验估计,贝叶斯估计联系与区别1.什么是参数在机器学习中,我们经常使用一个模型来描述生成观察数据的过程。例如,我们可以使用一个随机森林模型来分类客户是否会取消订阅服务(称为流失建模),或者我们可以用线性模型根据公司的广告支出来预测公司的收入(这是一个线性回归的例子)。每个模型都包含自己的一组参数,这些参数最终定义了模型本身。我们可以把线性模型写成y=mx+c的形式。在广告预测收入的例子中,x可以表示广告支…

    2022年10月19日
    4
  • 【spring】IoC原理[通俗易懂]

    【spring】IoC原理[通俗易懂]【spring】IoC原理

    2022年4月25日
    56
  • activity工作流引擎学习笔记1(初始)

    activity工作流引擎学习笔记1(初始)activiti工作流引擎学习笔记

    2022年7月11日
    22
  • Mayavi入门_乐理知识入门

    Mayavi入门_乐理知识入门环境,win7/1064位,python3.x1,安装Mayavi4.6原装的pip下载奇慢,先更换一下源,豆瓣的更新要比清华的快首先在window的文件夹窗口输入:%APPDATA%

    2022年8月5日
    10
  • linux+shell脚本100,shell脚本(shell编程100例)

    linux+shell脚本100,shell脚本(shell编程100例)ShellScript,Shell脚本与Windows/Dos下的批处理类似,也便是用各类指令预先放入到一个文件中,便利一次性执行的一个程序文件,主要是便利办理员进行设置或许办理用的。可是它比Windows下的批处理更强大,比用其他编程程序修改的程序功率更高,它使用了Linux/Unix下的指令。shell编程100例1、编写helloworld脚本#!/bin/bash#编写helloworld…

    2022年10月3日
    3
  • pycharm代码灰色_python import灰色

    pycharm代码灰色_python import灰色问题表述:pycharm中老是import失败,呈现灰色线,我按照、CSDN博客上给的设置“右键点击自己的工作空间,找下面的MarkDirectoryas选择SourceRoot”,但是未解决问题,说明不是我的文件存放不在一个频道(import文件首先会在相同的目录下面寻找)问题在于:我把两个文件相似文件名同时放在一个sourceroot下面,可能会有干扰,于是,解决办法是:完美…

    2022年8月25日
    7

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号