rocketmq 长轮询_消息队列RocketMQ版

rocketmq 长轮询_消息队列RocketMQ版RocketMQ消费端有两种获取消息的方式,Push方式和Pull方式。但这两种方式都有一定的缺陷,后来采用了一种折中的方法,采用”长轮询“的方式,它既可以拥有Pull的优点,又能达到保证实时性的目的。长轮询的思想:服务端接收到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次waitForRunning一段时间(默认是5秒),然后再Check。Broker默认最长阻塞时间为15秒,默认情况下当Broker一直没有新消息,第三次Check的时候,等待时间超过最长阻塞时间,

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

RocketMQ消费端有两种获取消息的方式,Push方式和Pull方式。但这两种方式都有一定的缺陷,后来采用了一种折中的方法,采用”长轮询“的方式,它既可以拥有Pull的优点,又能达到保证实时性的目的。

长轮询的思想:
服务端接收到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次waitForRunning一段时间(默认是5秒),然后再Check。Broker默认最长阻塞时间为15秒,默认情况下当Broker一直没有新消息,第三次Check的时候,等待时间超过最长阻塞时间,就返回空结果。在等待的过程中,Broker收到了新的消息后会直接返回请求结果。
“长轮询”的核心是,Broker端hold住客户端过来的请求一小段时间。在这段时间内有新的消息到达,就利用现有的连接立即返回消息给Consumer。

何时调用?

当未在Broker中查找到新信息时,状态代码为PULL_NOT_FOUND,会创建拉取任务PullRequest并提交到PullRequestHoldService线程中。

private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
        new ConcurrentHashMap<String, ManyPullRequest>(1024);

该类中有一个重要的参数pullRequestTable,key为“主题@队列号”,value是对应的ManyPullRequest。

先看一下它的run方法。

 public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                //Consumer订阅消息时,Broker是否开启长轮询
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {                //开启长轮询,每5秒尝试一次
                    this.waitForRunning(5 * 1000);
                } else {
                  //没有开启长轮询,默认等待1秒再次尝试                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        log.info("{} service end", this.getServiceName());
    }

从上面可以看出,如果开启了长轮询,每5s尝试一次,利用checkHoldRequest方法来判断是否有新消息的产生。如果未开启长轮询,则默认1s再次尝试。

然后再阅读一下checkHoldRequest方法。

private void checkHoldRequest() {
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0];
                int queueId = Integer.parseInt(kArray[1]);
                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                try {
                    this.notifyMessageArriving(topic, queueId, offset);
                } catch (Throwable e) {
                    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
                }
            }
        }
    }

从上面可以看出,它会遍历pullRequestTable,从key名中可以得到主题名topic和队列名queueId,然后通过topic和queueID获取到该消息队列的最大偏移量,之后调用notifyMessageArriving方法。

 public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
                                      long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();

                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }

                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                        // match by bit map, need eval again when properties is not null.
                        if (match && properties != null) {
                            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                        }

                        if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }

                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }

                    replayList.add(request);
                }

                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
    }

notifyMessageArriving方法中,首先会获取到当前该主题、队列中的所有的挂起拉取任务,如果该消息队列的最大偏移量大于待拉取偏移量,说明有新的消息传入。如果消息匹配后,则调用executeRequestWhenWakeup将消息返回给消息拉取客户端,否则等待下一次尝试。
如果挂起超时时间超时,则不继续等待将直接返回客户消息未找到。

从上面的机制可以看出开启长轮询后,不是实时的进行判断是否有新的消息产生,而是等待5s后再进行一次判断,不具有实时性。

在消息存储中,存在一个线程ReputMessageService,它会实时更新消息消费队列和索引文件,每执行一次任务推送后会休息1毫秒就继续尝试推送消息到消费队列和索引文件。

当新消息达到CommitLog时,ReputMessageService线程负责将消息转发给ConsumeQueue、IndexFile,如果Broker端开启了长轮询模式并且角色主节点,则最终将调用PullRequestHoldService线程的notifyMessageArriving方法唤醒挂起线程,判断当前消费队列最大偏移量是否大于待拉取偏移量,如果大于则拉取消息。长轮询模式使得消息消息拉取能够实现准实时。

0人点赞

RocketMQ

作者:九点半的马拉
链接:https://www.jianshu.com/p/68123e7bf03e
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/181992.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • verilog_移位寄存器_仿真(程序逐句解释)

    verilog_移位寄存器_仿真(程序逐句解释)前言  之前老是想着学的快点,就直接编译了程序就下载在开发板上跑,后来发现这样不行,因为如果程序有问题,验证和纠错的时间成本太高了(毕竟vivado跑一次花的时间很长),反过来学习仿真,下面是一点心得和体会。开发环境编译软件及版本:vivado2019.2编译语言:verilog  网上随便找了一个简单程序和仿真,先实现复现,再谈其他。下面我将先给出代码和仿真截图,再说具体的东西。移位寄存器程序代码:`timescale1ns/1ps/////////////////////////

    2022年7月16日
    8
  • navicat导出longtext类型数据乱码的解决方案

    navicat导出longtext类型数据乱码的解决方案一、先使用sql查询出需要导出的内容,将longtext类型使用cast转化成char类型SELECT company_id, wechat_mp_appid, CAST(survey_risk_tips_orgASchar)ASsurvey_risk_tips_org, CAST(survey_disclaimerASchar)ASsurvey_disclaimer, CAS…

    2022年5月14日
    47
  • DHCP原理及DHCP服务器的防攻击手段「建议收藏」

    DHCP原理及DHCP服务器的防攻击手段「建议收藏」一、DHCP简介1、产生背景:网络增大,手工配置存在很多问题【人员素质要求高、容易出错、灵活性差、IP地址资源利用率低、工作量大,不利于管理等】2、DHCP相对于静态手工配置的优点【效率高、灵活性强、易于管理等】二、DHCP的原理与配置(一)、DHCP的基本工作过程【发现阶段、提供阶段、请求阶段、确认阶段】如下图:【发现阶段】:在发现阶段,DHCP客户端会以广播的方式给自己所在在广播域…

    2022年6月17日
    33
  • Oracle恢复流程

    Oracle恢复流程备份(Backup):数据文件、控制文件、归档->备份文件还原(Restore):备份文件->数据文件状态物理恢复(Recover):数据文件+应用日志文件【归档+控制文件】时间点逻辑实验记录

    2022年7月17日
    28
  • Intellij热部署插件JRebel

    Intellij热部署插件JRebelIntellij热部署插件JRebel安装JRebel激活JRebel相关设置Intellij热部署插件JRebel项目需求,一直用eclipse的我,也要改用IDEA了,一开始,很不习惯。经过几天的慢慢摸索和习惯之后,发现IDEA确实很好用。dark的界面是我喜欢的,智能的提示也让写代码不再枯燥。遗憾的是IDEA本身没有集成热部署工具,一开始改动代码之后,都需要重新r…

    2022年5月22日
    32
  • 罗技键盘手机app_罗技k480键盘教程

    罗技键盘手机app_罗技k480键盘教程1罗技Android平板键盘:概况前言:相信有很多平板用户都羡慕华硕推出的一款带外接键盘功能的平板吧,现在不用垂涎欲滴了,因为罗技已经看到平板电脑用户群中对外接键盘输入的需求,推出了罗技平板电脑键盘,下面就和我们一起关注一下吧。罗技平板电脑键盘(支持Android)概况介绍:打开罗技平板电脑键盘的包装,我们发现只有一个黑盒子,其实这个扁平的黑盒子是用于架设放置平板电脑的。打开黑盒子,里面蓝色的内饰…

    2022年10月15日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号