RocketMQ消费端有两种获取消息的方式,Push方式和Pull方式。但这两种方式都有一定的缺陷,后来采用了一种折中的方法,采用”长轮询“的方式,它既可以拥有Pull的优点,又能达到保证实时性的目的。
何时调用?
当未在Broker中查找到新信息时,状态代码为PULL_NOT_FOUND,会创建拉取任务PullRequest并提交到PullRequestHoldService线程中。
private ConcurrentMap
pullRequestTable = new ConcurrentHashMap
(1024);
该类中有一个重要的参数pullRequestTable,key为“主题@队列号”,value是对应的ManyPullRequest。
先看一下它的run方法。
public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { //Consumer订阅消息时,Broker是否开启长轮询 if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { //开启长轮询,每5秒尝试一次 this.waitForRunning(5 * 1000); } else { //没有开启长轮询,默认等待1秒再次尝试 this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
从上面可以看出,如果开启了长轮询,每5s尝试一次,利用checkHoldRequest方法来判断是否有新消息的产生。如果未开启长轮询,则默认1s再次尝试。
然后再阅读一下checkHoldRequest方法。
private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); try { this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); } } } }
从上面可以看出,它会遍历pullRequestTable,从key名中可以得到主题名topic和队列名queueId,然后通过topic和queueID获取到该消息队列的最大偏移量,之后调用notifyMessageArriving方法。
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map
properties) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List
requestList = mpr.cloneListAndClear(); if (requestList != null) { List
replayList = new ArrayList
(); for (PullRequest request : requestList) { long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); // match by bit map, need eval again when properties is not null. if (match && properties != null) { match = request.getMessageFilter().isMatchedByCommitLog(null, properties); } if (match) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } } if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); } } } }
在notifyMessageArriving方法中,首先会获取到当前该主题、队列中的所有的挂起拉取任务,如果该消息队列的最大偏移量大于待拉取偏移量,说明有新的消息传入。如果消息匹配后,则调用executeRequestWhenWakeup将消息返回给消息拉取客户端,否则等待下一次尝试。
如果挂起超时时间超时,则不继续等待将直接返回客户消息未找到。
从上面的机制可以看出开启长轮询后,不是实时的进行判断是否有新的消息产生,而是等待5s后再进行一次判断,不具有实时性。
在消息存储中,存在一个线程ReputMessageService,它会实时更新消息消费队列和索引文件,每执行一次任务推送后会休息1毫秒就继续尝试推送消息到消费队列和索引文件。
当新消息达到CommitLog时,ReputMessageService线程负责将消息转发给ConsumeQueue、IndexFile,如果Broker端开启了长轮询模式并且角色主节点,则最终将调用PullRequestHoldService线程的notifyMessageArriving方法唤醒挂起线程,判断当前消费队列最大偏移量是否大于待拉取偏移量,如果大于则拉取消息。长轮询模式使得消息消息拉取能够实现准实时。
0人点赞
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/225955.html原文链接:https://javaforall.net
