RocketMQ消息消费之长轮询

RocketMQ消息消费之长轮询RocketMQ 消费端有两种获取消息的方式 Push 方式和 Pull 方式 但这两种方式都有一定的缺陷 后来采用了一种折中的方法 采用 长轮询 的方式 它既可以拥有 Pull 的优点 又能达到保证实时性的目的 长轮询的思想 服务端接收到新消息请求后 如果队列里没有新消息 并不急于返回 通过一个循环不断查看状态 每次 waitForRunni 一段时间 默认是 5 秒 然后再 Check Broker 默认最长阻塞时间为 15 秒 默认情况下当 Broker 一直没有新消息 第三次 Check 的时候 等待时间超过最长阻塞时间

RocketMQ消费端有两种获取消息的方式,Push方式和Pull方式。但这两种方式都有一定的缺陷,后来采用了一种折中的方法,采用”长轮询“的方式,它既可以拥有Pull的优点,又能达到保证实时性的目的。

何时调用?

当未在Broker中查找到新信息时,状态代码为PULL_NOT_FOUND,会创建拉取任务PullRequest并提交到PullRequestHoldService线程中。

private ConcurrentMap 
  
    pullRequestTable = new ConcurrentHashMap 
   
     (1024); 
    
  

该类中有一个重要的参数pullRequestTable,key为“主题@队列号”,value是对应的ManyPullRequest。

先看一下它的run方法。

 public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { //Consumer订阅消息时,Broker是否开启长轮询 if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { //开启长轮询,每5秒尝试一次 this.waitForRunning(5 * 1000); } else { //没有开启长轮询,默认等待1秒再次尝试 this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); } 

从上面可以看出,如果开启了长轮询,每5s尝试一次,利用checkHoldRequest方法来判断是否有新消息的产生。如果未开启长轮询,则默认1s再次尝试。

然后再阅读一下checkHoldRequest方法。

private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); try { this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); } } } } 

从上面可以看出,它会遍历pullRequestTable,从key名中可以得到主题名topic和队列名queueId,然后通过topic和queueID获取到该消息队列的最大偏移量,之后调用notifyMessageArriving方法。

 public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map 
  
    properties) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List 
   
     requestList = mpr.cloneListAndClear(); if (requestList != null) { List 
    
      replayList = new ArrayList 
     
       (); for (PullRequest request : requestList) { long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); // match by bit map, need eval again when properties is not null. if (match && properties != null) { match = request.getMessageFilter().isMatchedByCommitLog(null, properties); } if (match) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } } if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); } } } } 
      
     
    
  

notifyMessageArriving方法中,首先会获取到当前该主题、队列中的所有的挂起拉取任务,如果该消息队列的最大偏移量大于待拉取偏移量,说明有新的消息传入。如果消息匹配后,则调用executeRequestWhenWakeup将消息返回给消息拉取客户端,否则等待下一次尝试。
如果挂起超时时间超时,则不继续等待将直接返回客户消息未找到。

从上面的机制可以看出开启长轮询后,不是实时的进行判断是否有新的消息产生,而是等待5s后再进行一次判断,不具有实时性。

在消息存储中,存在一个线程ReputMessageService,它会实时更新消息消费队列和索引文件,每执行一次任务推送后会休息1毫秒就继续尝试推送消息到消费队列和索引文件。

当新消息达到CommitLog时,ReputMessageService线程负责将消息转发给ConsumeQueue、IndexFile,如果Broker端开启了长轮询模式并且角色主节点,则最终将调用PullRequestHoldService线程的notifyMessageArriving方法唤醒挂起线程,判断当前消费队列最大偏移量是否大于待拉取偏移量,如果大于则拉取消息。长轮询模式使得消息消息拉取能够实现准实时。

0人点赞

RocketMQ

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/225955.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 上午8:10
下一篇 2026年3月17日 上午8:10


相关推荐

  • Java后台接收参数出现java.lang.Integer cannot be cast to java.lang.Double错误(已解决)[通俗易懂]

    Java后台接收参数出现java.lang.Integer cannot be cast to java.lang.Double错误(已解决)[通俗易懂]在Java接受前端传过来的数据信息的时候,使用List<List<double>>进行接收结果出现这个错误java.lang.Integercannotbecasttojava.lang.Double是类型转换出现的错误,当是这个数据在前端明明处理过,使用parseFloat转为了浮点数后端使用List<List>进行接收,此时也没有报错于是打开debug进行调试检查问题,发现传过来的数值如果是整数则为Integer类型,有小数的才是double类型

    2022年7月16日
    21
  • vs2010旗舰版_密钥

    vs2010旗舰版_密钥YCFHQ-9DWCY-DKV88-T2TMH-G7BHP

    2022年5月3日
    44
  • PowerShell AD 管理

    PowerShell AD 管理

    2022年2月21日
    107
  • LaTeX参考文献引用显示?问号解决办法!!!!「建议收藏」

    LaTeX参考文献引用显示?问号解决办法!!!!「建议收藏」在使用Latex之前,我们一般会借用已有的论文模板,在模板基础上加入我们自己的文章内容,随后编译成PDF文件,其基本流程就是:Latex->Bibtex->Latext->Latex。1、第一步点击Latex编译,可以获得*.aux文件、.dvi文件、.log文件以及*.gz文件;2、第二步点击Bibtex编译,可以获得*.blg(性能监视器文件)和*.bbl文件;3、第三…

    2025年10月11日
    4
  • 17个最佳WordPress画廊插件[通俗易懂]

    17个最佳WordPress画廊插件[通俗易懂]驯服混乱并改变您的内容。如果您想展示您的内容(帖子,图像,视频,音频文件以及您能想到的任何其他内容),从而为网站访问者带来引人入胜的体验,那么本文将帮助您做到这一点。在这里,我们重点介绍CodeCanyon上可用的一些最佳WordPress画廊插件。我们已按画廊类型对其进行了细分,因此请继续阅读以了解更多有关为什么这些是用于视频和多媒体,图像和WordPress网格的最佳WordPr…

    2022年5月28日
    117
  • Vue集成activity工作流

    Vue集成activity工作流情景:由于activiti与系统应用主题样式出入较大,协商后决定将activiti的editor-app放在前台。ps:内网开发,无图,凭记忆摘取主要内容。步骤:将activiti放在public即静态目录下。 通过iframe在相应的前台工作流界面引入activiti的model.html(最外层的主html,名字可能有出入)。 mounted时将this,即vuecompo…

    2022年6月10日
    275

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号