rocketmq延迟队列原理_rocketmq延迟队列原理

rocketmq延迟队列原理_rocketmq延迟队列原理在java的延迟队列中,无法支持集群的延迟。Redis可以做到对应的延迟功能,但是自己封装毕竟局限于业务。而且封装也需要耗费一定时间。今天我们就讲一个现有的延迟队列,不仅支持分布式服务,而且解耦业务代码,而且支持不同延迟时间的造好的轮子吧。~那就是RocketMQ延时队列。RocketMQ将延时队列的延时延时时间分为18个级别123456789101112131415161718分别对应下面的延迟时间,在使用时,直接传递level即可。mess

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

在java的延迟队列中,无法支持集群的延迟。
Redis可以做到对应的延迟功能,但是自己封装毕竟局限于业务。而且封装也需要耗费一定时间。
今天我们就讲一个现有的延迟队列,不仅支持分布式服务,而且解耦业务代码,而且支持不同延迟时间的造好的轮子吧。 ~ 那就是 RocketMQ 延时队列。

RocketMQ将延时队列的延时延时时间分为18个级别

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 分别对应下面的延迟时间,在使用时,直接传递 level即可。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
当然这个时间可以自己修改,如果不维护 则按照默认的

在发送MQ消息的时候只需要设置

Message.setDelayTimeLevel(delayLevel);

Jetbrains全家桶1年46,售后保障稳定

MQ发送的代码:

public class DelayMQProducerTest { 
   
    public static void main(String[] args) throws MQClientException, InterruptedException { 
   
        DefaultMQProducer producer = new DefaultMQProducer("delay_test_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        try { 
   
            for (int i = 0; i < 3; i++) { 
   
                Message msg = new Message("Topic_Delay_Test",// topic
                        "Tag_Delay",// tag
                        (new Date() + "Topic_Delay_Test" + i).getBytes()// body
                );
                msg.setDelayTimeLevel(2); // 设置延迟级别为2 也就是 5s 
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (Exception e) { 
   
            e.printStackTrace();
        }
       producer.shutdown();
    }
 
}

接下来就跟进到代码里看是RocketMQ是如何是做到延迟发送消息的。

本人使用的是rocketMQ 4.2 下载地址

进入Message可以看到两个方法:


  // 获取延迟等级
    public int getDelayTimeLevel() { 
   
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) { 
   
            return Integer.parseInt(t);
        }

        return 0;
    }
    // 设置延迟等级
    public void setDelayTimeLevel(int level) { 
   
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }

既然设置方法可以看到,那通过获取Level追看哪里使用,然后研究对应的实现。
获取Level的代码位置
在这里插入图片描述
在这里将topic和queueId替换为延迟队列的队列(SCHEDULE_TOPIC_XXXX),这样就保证消息不会立即被发送出去。 而是经过SCHEDULE_TOPIC_XXXX的特殊处理后,然后在发送到Consumer。

那在这里被替换后,是怎么保证延迟发送呢?

继续往下

由于对源码的不熟悉,也不了解,其实费了一些功夫,发现ScheduleMessageService.java 有start方法

 public void start() { 
   

        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { 
   
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) { 
   
                offset = 0L;
            }

            if (timeDelay != null) { 
   
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() { 
   

            @Override
            public void run() { 
   
                try { 
   
                    ScheduleMessageService.this.persist();
                } catch (Throwable e) { 
   
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }

从start方法中可以看到,这个时候就启动了定时器,开始从队列里获取数据了。 那么start方法是怎么被调用的呢?
在这里插入图片描述
在DefaultMessageStore中启动的。

接下来我们还是把注意力放在 ScheduleMessageService.start方法的执行过程吧。
通过源码追踪,就看到了这个方法

   public void executeOnTimeup() { 
   
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) { 
   
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) { 
   
                    try { 
   
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 
   
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) { 
   
                                if (cq.getExt(tagsCode, cqExtUnit)) { 
   
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else { 
   
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;

                            if (countdown <= 0) { 
   
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) { 
   
                                    try { 
   
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.defaultMessageStore
                                                .putMessage(msgInner);

                                        if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { 
   
                                            continue;
                                        } else { 
   
                                            // XXX: warn and notify me
                                            log.error(
                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                                msgExt.getTopic(), msgExt.getMsgId());
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) { 
   
                                        /* * XXX: warn and notify me */
                                        log.error(
                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                                    }
                                }
                            } else { 
   
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } // end of for

                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally { 
   

                        bufferCQ.release();
                    }
                } // end of if (bufferCQ != null)
                else { 
   

                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) { 
   
                        failScheduleOffset = cqMinOffset;
                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                    }
                }
            } // end of if (cq != null)

            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }

但是注意处理的逻辑就在这里了。 如果到了延迟时间,就发送消息 否则就继续进行延迟返送。
在这里插入图片描述
总结,RocketMQ的延迟消息,使用起来方便,而且解耦代码,但是配置的延迟时间不够灵活。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/234086.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号