rocketmq延迟队列原理_rocketmq延迟队列原理

rocketmq延迟队列原理_rocketmq延迟队列原理在java的延迟队列中,无法支持集群的延迟。Redis可以做到对应的延迟功能,但是自己封装毕竟局限于业务。而且封装也需要耗费一定时间。今天我们就讲一个现有的延迟队列,不仅支持分布式服务,而且解耦业务代码,而且支持不同延迟时间的造好的轮子吧。~那就是RocketMQ延时队列。RocketMQ将延时队列的延时延时时间分为18个级别123456789101112131415161718分别对应下面的延迟时间,在使用时,直接传递level即可。mess

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

在java的延迟队列中,无法支持集群的延迟。
Redis可以做到对应的延迟功能,但是自己封装毕竟局限于业务。而且封装也需要耗费一定时间。
今天我们就讲一个现有的延迟队列,不仅支持分布式服务,而且解耦业务代码,而且支持不同延迟时间的造好的轮子吧。 ~ 那就是 RocketMQ 延时队列。

RocketMQ将延时队列的延时延时时间分为18个级别

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 分别对应下面的延迟时间,在使用时,直接传递 level即可。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
当然这个时间可以自己修改,如果不维护 则按照默认的

在发送MQ消息的时候只需要设置

Message.setDelayTimeLevel(delayLevel);

Jetbrains全家桶1年46,售后保障稳定

MQ发送的代码:

public class DelayMQProducerTest { 
   
    public static void main(String[] args) throws MQClientException, InterruptedException { 
   
        DefaultMQProducer producer = new DefaultMQProducer("delay_test_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        try { 
   
            for (int i = 0; i < 3; i++) { 
   
                Message msg = new Message("Topic_Delay_Test",// topic
                        "Tag_Delay",// tag
                        (new Date() + "Topic_Delay_Test" + i).getBytes()// body
                );
                msg.setDelayTimeLevel(2); // 设置延迟级别为2 也就是 5s 
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (Exception e) { 
   
            e.printStackTrace();
        }
       producer.shutdown();
    }
 
}

接下来就跟进到代码里看是RocketMQ是如何是做到延迟发送消息的。

本人使用的是rocketMQ 4.2 下载地址

进入Message可以看到两个方法:


  // 获取延迟等级
    public int getDelayTimeLevel() { 
   
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) { 
   
            return Integer.parseInt(t);
        }

        return 0;
    }
    // 设置延迟等级
    public void setDelayTimeLevel(int level) { 
   
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }

既然设置方法可以看到,那通过获取Level追看哪里使用,然后研究对应的实现。
获取Level的代码位置
在这里插入图片描述
在这里将topic和queueId替换为延迟队列的队列(SCHEDULE_TOPIC_XXXX),这样就保证消息不会立即被发送出去。 而是经过SCHEDULE_TOPIC_XXXX的特殊处理后,然后在发送到Consumer。

那在这里被替换后,是怎么保证延迟发送呢?

继续往下

由于对源码的不熟悉,也不了解,其实费了一些功夫,发现ScheduleMessageService.java 有start方法

 public void start() { 
   

        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { 
   
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) { 
   
                offset = 0L;
            }

            if (timeDelay != null) { 
   
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() { 
   

            @Override
            public void run() { 
   
                try { 
   
                    ScheduleMessageService.this.persist();
                } catch (Throwable e) { 
   
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }

从start方法中可以看到,这个时候就启动了定时器,开始从队列里获取数据了。 那么start方法是怎么被调用的呢?
在这里插入图片描述
在DefaultMessageStore中启动的。

接下来我们还是把注意力放在 ScheduleMessageService.start方法的执行过程吧。
通过源码追踪,就看到了这个方法

   public void executeOnTimeup() { 
   
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) { 
   
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) { 
   
                    try { 
   
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 
   
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) { 
   
                                if (cq.getExt(tagsCode, cqExtUnit)) { 
   
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else { 
   
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;

                            if (countdown <= 0) { 
   
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) { 
   
                                    try { 
   
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.defaultMessageStore
                                                .putMessage(msgInner);

                                        if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { 
   
                                            continue;
                                        } else { 
   
                                            // XXX: warn and notify me
                                            log.error(
                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                                msgExt.getTopic(), msgExt.getMsgId());
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) { 
   
                                        /* * XXX: warn and notify me */
                                        log.error(
                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                                    }
                                }
                            } else { 
   
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } // end of for

                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally { 
   

                        bufferCQ.release();
                    }
                } // end of if (bufferCQ != null)
                else { 
   

                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) { 
   
                        failScheduleOffset = cqMinOffset;
                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                    }
                }
            } // end of if (cq != null)

            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }

但是注意处理的逻辑就在这里了。 如果到了延迟时间,就发送消息 否则就继续进行延迟返送。
在这里插入图片描述
总结,RocketMQ的延迟消息,使用起来方便,而且解耦代码,但是配置的延迟时间不够灵活。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/234086.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 在SharePoint 2010中部署RBS (转)

    在SharePoint 2010中部署RBS (转)

    2021年12月9日
    61
  • Nginx和Apache和Tomcat的区别及优缺点「建议收藏」

    Nginx和Apache和Tomcat的区别及优缺点「建议收藏」Nginx和Apache和Tomcat的区别及优缺点1、定义:1)ApacheApacheHTTP服务器是一个模块化的服务器,可以运行在几乎所有广泛使用的计算机平台上。其属于应用服务器。Apache支持支持模块多,性能稳定,Apache本身是静态解析,适合静态HTML、图片等,但可以通过扩展脚本、模块等支持动态页面等。(Apche可以支持PHPcgiperl,但是要使用Java的话,你需…

    2022年4月26日
    42
  • microsoft edge无法连接到代理服务器(ie代理服务器错误)

    电脑里有谷歌浏览器也有系统自带的Microsoftedge浏览器,谷歌浏览器可以上网,edge不能上网,出现无法连接到代理服务器的字样。解决方法:方法1、直接根据当前浏览器页面中的“打开代理设置”,(应该是叫这个名字),直接跳到代理界面,把“自动检测设置”按钮打开。重启浏览器。就OK了。 方法2、打开浏览器的设置按钮,在最后一行找到“设置”,点进去后,找到“高级设置”,点击“查看高级设置”,在找…

    2022年4月11日
    680
  • linux获取时间戳_java时间戳转换成时间

    linux获取时间戳_java时间戳转换成时间转换成指定的日期格式,如“2021/08/2919:25:18‘:date-d@1630236318+”%Y/%m/%d%H:%M:%S”leon@ubuntu:~/work$date-d@1630236318+”%Y/%m/%d%H:%M:%S”2021/08/2911:25:18date-d@1630236318leon@ubuntu:~/work$date-d@1630236318Sun29Aug202111:25:18AMUTC…

    2022年10月2日
    2
  • mysql数据库优化大全

    mysql数据库优化大全

    2021年11月7日
    50
  • Go语言爱好者周刊:第 3 期

    Go语言爱好者周刊:第 3 期这里记录每周值得分享的Go语言相关内容,周日发布。欢迎投稿,推荐或自荐文章/软件/资源等,请提交issue[1]。鉴于大部分人可能没法坚持把英文文章看完,因此,周…

    2022年5月20日
    36

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号