rocketmq 长轮询_消息队列RocketMQ版

rocketmq 长轮询_消息队列RocketMQ版RocketMQ消费端有两种获取消息的方式,Push方式和Pull方式。但这两种方式都有一定的缺陷,后来采用了一种折中的方法,采用”长轮询“的方式,它既可以拥有Pull的优点,又能达到保证实时性的目的。长轮询的思想:服务端接收到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次waitForRunning一段时间(默认是5秒),然后再Check。Broker默认最长阻塞时间为15秒,默认情况下当Broker一直没有新消息,第三次Check的时候,等待时间超过最长阻塞时间,

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

RocketMQ消费端有两种获取消息的方式,Push方式和Pull方式。但这两种方式都有一定的缺陷,后来采用了一种折中的方法,采用”长轮询“的方式,它既可以拥有Pull的优点,又能达到保证实时性的目的。

长轮询的思想:
服务端接收到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次waitForRunning一段时间(默认是5秒),然后再Check。Broker默认最长阻塞时间为15秒,默认情况下当Broker一直没有新消息,第三次Check的时候,等待时间超过最长阻塞时间,就返回空结果。在等待的过程中,Broker收到了新的消息后会直接返回请求结果。
“长轮询”的核心是,Broker端hold住客户端过来的请求一小段时间。在这段时间内有新的消息到达,就利用现有的连接立即返回消息给Consumer。

何时调用?

当未在Broker中查找到新信息时,状态代码为PULL_NOT_FOUND,会创建拉取任务PullRequest并提交到PullRequestHoldService线程中。

private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
        new ConcurrentHashMap<String, ManyPullRequest>(1024);

该类中有一个重要的参数pullRequestTable,key为“主题@队列号”,value是对应的ManyPullRequest。

先看一下它的run方法。

 public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                //Consumer订阅消息时,Broker是否开启长轮询
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {                //开启长轮询,每5秒尝试一次
                    this.waitForRunning(5 * 1000);
                } else {
                  //没有开启长轮询,默认等待1秒再次尝试                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        log.info("{} service end", this.getServiceName());
    }

从上面可以看出,如果开启了长轮询,每5s尝试一次,利用checkHoldRequest方法来判断是否有新消息的产生。如果未开启长轮询,则默认1s再次尝试。

然后再阅读一下checkHoldRequest方法。

private void checkHoldRequest() {
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0];
                int queueId = Integer.parseInt(kArray[1]);
                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                try {
                    this.notifyMessageArriving(topic, queueId, offset);
                } catch (Throwable e) {
                    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
                }
            }
        }
    }

从上面可以看出,它会遍历pullRequestTable,从key名中可以得到主题名topic和队列名queueId,然后通过topic和queueID获取到该消息队列的最大偏移量,之后调用notifyMessageArriving方法。

 public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
                                      long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();

                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }

                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                        // match by bit map, need eval again when properties is not null.
                        if (match && properties != null) {
                            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                        }

                        if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }

                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }

                    replayList.add(request);
                }

                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
    }

notifyMessageArriving方法中,首先会获取到当前该主题、队列中的所有的挂起拉取任务,如果该消息队列的最大偏移量大于待拉取偏移量,说明有新的消息传入。如果消息匹配后,则调用executeRequestWhenWakeup将消息返回给消息拉取客户端,否则等待下一次尝试。
如果挂起超时时间超时,则不继续等待将直接返回客户消息未找到。

从上面的机制可以看出开启长轮询后,不是实时的进行判断是否有新的消息产生,而是等待5s后再进行一次判断,不具有实时性。

在消息存储中,存在一个线程ReputMessageService,它会实时更新消息消费队列和索引文件,每执行一次任务推送后会休息1毫秒就继续尝试推送消息到消费队列和索引文件。

当新消息达到CommitLog时,ReputMessageService线程负责将消息转发给ConsumeQueue、IndexFile,如果Broker端开启了长轮询模式并且角色主节点,则最终将调用PullRequestHoldService线程的notifyMessageArriving方法唤醒挂起线程,判断当前消费队列最大偏移量是否大于待拉取偏移量,如果大于则拉取消息。长轮询模式使得消息消息拉取能够实现准实时。

0人点赞

RocketMQ

作者:九点半的马拉
链接:https://www.jianshu.com/p/68123e7bf03e
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/181992.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Python+opencv调用摄像头获取视频保存到本地并应用到YOLO中保存视频检测后的结果

    Python+opencv调用摄像头获取视频保存到本地并应用到YOLO中保存视频检测后的结果文章目录前言读写视频流获取摄像头:写入视频:完整的调用摄像头并保存视频代码应用到YOLO中总结前言之前的文章介绍了如何调用摄像头间隔拍照并保存图片(文章链接:Python+OpenCV调用摄像头固定间隔时间拍照并保存到本地同时应用到YOLO中检测目标),这篇文章再介绍一下如何调用摄像头并保存视频。读写视频流获取摄像头:capture=cv2.VideoCapture(0)ref,frame=capture.read()前文介绍过,cv2.VideoCapture()获取摄像头

    2022年6月22日
    30
  • pycharm安装CV2,tensorflow[通俗易懂]

    pycharm安装CV2,tensorflow[通俗易懂]在pycharm中导入importcv2时不能正常使用,需要安装cv2库,但是我们在pycharm中安装库时没有找到CV2,就需要安装opencv库,我采用的是以下的方式:1、首先看下pycharm中的环境,我现在用的环境是anaconda下的PyCharmproject2、在菜单栏中打开anacondaprompt(anaconda3)3、刚开始进入时环境是4、切换环境为pycharm中环境activatePyCharmproject5、已经下好了opencv的whl文件,我下到了3

    2022年8月26日
    10
  • 【Linux】进程间通信「建议收藏」

    【Linux】进程间通信「建议收藏」目录1.进程间通信1.1.进程间通信的目的1.2.如何实现进程间通信2.管道通信2.1.匿名管道2.1.1创建匿名管道2.1.2.深入理解匿名管道2.2.命名管道2.2.1.创建命名管道3.systemV标准进程间通信3.1.共享内存3.1.1.实现原理3.1.2.代码实现3.2.消息队列(了解)3.2.1实现原理3.3.信号量(了解)3.3.1.实现原理1.进程间通信1.1.

    2022年10月11日
    3
  • java中判断字符串是否日期格式的方法建议收藏

    大家好,又见面了,我是全栈君,今天给大家准备了Idea注册码。此处内容已经被作者隐藏,请输入验证码查看内容验证码:请关注本站微信公众号,回复“验证码”,获取验证码。在微信里搜索“全…

    未分类 2021年12月18日
    59
  • office2016专业增强版永久激活密钥 离线激活_增强版16office激活

    office2016专业增强版永久激活密钥 离线激活_增强版16office激活1.Office2016专业增强版永久激活码:MicrosoftOffice2016ProPlusRetailMak序列号XNTT9-CWMM3-RM2YM-D7KB2-JB6DVBHXN

    2022年8月5日
    7
  • xshell5连接不上虚拟机_虚拟机的网络连接设置

    xshell5连接不上虚拟机_虚拟机的网络连接设置一:首先解决的关于ping的问题1.在虚拟机中ping百度看能不能先ping通,如果虚拟机连接不上网络的话Xshell肯定是连接不上的,如果有上述情况的请点击二:检查你虚拟机中防火墙是否关闭CentOs6中查看防火墙状态:serviceiptablesstatus关闭防火墙:serviceiptablesstop禁用防火墙:chkconfigiptablesoffCentOs7中查看防火墙状态:systemctlstatusfirewalld.service关闭防火墙:

    2022年9月22日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号