kafka topicPartitions问题

kafka topicPartitions问题当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息现象如下:2019-06-1115:30:02.516[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]INFOcom.example.kafka.consumer.TestB-==[cousumerC…

大家好,又见面了,我是你们的朋友全栈君。

当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息

现象如下:

2019-06-11 15:30:02.516 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238202513====sendTest3===3
2019-06-11 15:30:02.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238202513====sendTest3===3
2019-06-11 15:30:02.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238202513====sendTest3===3
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238204513====sendTest3===4

消费者

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerA(String msg) { 
   
    logger.info("==[cousumerA]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerB(String msg) { 
   
    logger.info("==[cousumerB]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = { 
   @TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerC(String msg) { 
   
    logger.info("==[cousumerC]==" + msg);
}

生产者


    @Autowired
    KafkaTemplate kafkaTemplate;

    int i =0;
    @Scheduled(fixedRate = 2000)
    public void sendTest3() { 
   
        kafkaTemplate.send("hzl.test.aaa", System.currentTimeMillis() + "====sendTest3===" + i++);
    }

如果采用如下方式,则只会被消费一次

    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerD(String msg) { 
   
        logger.info("==[cousumerD]==" + msg);
    }



    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerE(String msg) { 
   
        logger.info("==[cousumerE]==" + msg);
    }


    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerF(String msg) { 
   
        logger.info("==[cousumerF]==" + msg);
    }

另外,如何监听topic中最新的消息

auto-offset-reset: latest 

这样设置好像也是从消费组中提交后的offset开始消费的,并不是最新的一条消息?

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/153317.html原文链接:https://javaforall.net

(0)
上一篇 2022年6月26日 上午9:00
下一篇 2022年6月26日 上午9:00


相关推荐

  • web漏洞扫描工具集合「建议收藏」

    web漏洞扫描工具集合「建议收藏」最好用的开源Web漏洞扫描工具梳理链接:www.freebuf.com/articles/web/155209.html赛门铁克2017年互联网安全威胁报告中提出在他们今年扫描的网站中,有76%都

    2022年8月5日
    6
  • linux shell 编程书籍

    linux shell 编程书籍

    2026年3月17日
    2
  • mysql分区表_MySQL分区分表[通俗易懂]

    mysql分区表_MySQL分区分表[通俗易懂]1、为什么要分表?数据库数据越来越大,随之而来的是单个表中数据太多。以至于查询速度变慢,而且由于表的锁机制导致应用操作也搜到严重影响,出现了数据库性能瓶颈。mysql中有一种机制是表锁定和行锁定,是为了保证数据的完整性。表锁定表示你们都不能对这张表进行操作,必须等我对表操作完才行。行锁定也一样,别的sql必须等我对这条数据操作完了,才能对这条数据进行操作。当出现这种情况时,我们可以考虑分表或分区。…

    2022年4月29日
    53
  • ssm框架实现用户登录的拦截器和过滤器[通俗易懂]

    ssm框架实现用户登录的拦截器和过滤器[通俗易懂]文章只要是实现用户登录过程的验证,用拦截器可以拦截用户没有登录的情况下,不能进行访问系统页面以下是自定义拦截器工程目录实现的过程:1:新建一个interceptor拦截器包,创建一个LoginInterceptor拦截器类2:将这个类,继承HandlerInterceptor接口,并实现HandlerInterceptor这个接口的三个方法public…

    2022年7月17日
    17
  • Qt中的QFile读写文件操作

    Qt中的QFile读写文件操作1.首先记录一下QString,QByteArray,char*之间的转换(1)QString->QByteArrayQStringbuf="123";QByteArraya=buf.toUtf8();//中文a=buf.toLocal8Bit();//本地编码(2)QByteArray->char*char*b=a.data…

    2022年6月13日
    30
  • centos7 安装图形界面

    centos7 安装图形界面本人用VMware版本15.0.2build-10952284(关系不大)开启前要在虚拟机–>设置–>3D图形如下如果是云服务器跳过此步yum下载图形界面软件yumgroupinstall”GNOMEDesktop””GraphicalAdministrationTools”修改配置ln-sf/lib/systemd/system…

    2022年5月16日
    44

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号