【外行也能看懂的RabbitMQ系列(四)】—— RabbitMQ进阶篇之通过插件实现延迟队列(内含实现代码及rabbitmq_delayed_message_exchange安装)

【外行也能看懂的RabbitMQ系列(四)】—— RabbitMQ进阶篇之通过插件实现延迟队列(内含实现代码及rabbitmq_delayed_message_exchange安装)恭喜所有看到本篇文章的小伙伴 成功解锁了 RabbitMQ 系列之高级特性插件版延迟队列的内容通过本文 你将清楚的了解到 什么是延时队列 延时队列使用场景 如何安装安装延时队列插件 rabbitmq delayed message exchange 本文最后 小名将上一篇文章的实例做一些修改来实现新的效果

在这里插入图片描述

系列文章目录



前言

恭喜所有看到本篇文章的小伙伴,成功解锁了RabbitMQ系列之高级特性插件版
延迟队列的内容?通过本文,你将清楚的了解到:什么是延时队列?延时队列使用场景?如何安装安装延时队列插件(rabbitmq_delayed_message_exchange)??本文最后,小名将上一篇文章的实例做一些修改来实现新的效果?

在这里插入图片描述


一、什么是延时队列

什么是延时队列?顾名思义:首先它是一种队列,再给它附加一个延迟消费队列消息的功能,也就是说延时队列中的元素是都是带时间属性的,可以指定队列中的消息在哪个时间点被消费。

简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

二、延时队列使用场景

拿第一个场景来说,系统创建订单之后,需要取消所有超过30分钟没有支付的订单。拿有“重度选择恐惧症”的小名来说吧,也许在加购物车,去支付这些操作都还好好的,突然在付款界面看到价格后停住了,发现这东西我不是那么的需要,就放弃支付了,相信一天之内这样的小伙伴不在少数。比如:小名03:15放弃支付了,小红03:16放弃支付了,小刚03:15放弃支付了,小王04:45放弃支付了……我们如何让系统知道在03:45通知小名和小刚,3:46通知小红,05:15通知小王呢?

再如后面的几个场景:发生新用户注册事件,三天后检查新注册用户的活动数据,然后通知没有任何活动记录的用户;发生商户一个月内还没上传商品信息事件,则冻结该商户的商铺;发生预定会议事件,判断离会议开始是否只有十分钟了,如果是,则通知各个与会人员。

2.2 省时省力的解决方法
这几种场景,你是不是感觉使用定时任务,轮询所有数据,每秒查一次,取出需要被处理的数据,然后运行相应的业务代码处理就可以了?的确如果数据量比较少,这样做即省时又省力。

2.3 上述做法的缺点
如果数据量比较大,并且时效性较强的场景“30分钟内没有支付的订单,自动取消订单并通知用户”,在很短时间内,没有支付的订单数据可能多到惊人,如果是活动期间甚至会达到百万甚至千万级别,对于这么庞大的数据量依旧使用上述的轮询方式,显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

2.4 使用延时队列的必要性
如果使用延时消息队列,我们在创建订单的同时将时间推迟30分钟放入消息中间件中,等时间一到再取出消费即可。

三、RabbitMQ中的TTL

上文中小名已经解释过了,这里呢,帮大家简单回忆一下

需求:

模拟用户商城购买商品时的两种情况:1. 成功下单,2. 超时提醒

  1. 用户下单
  2. 用户下单后展示等待付款页面
  3. 在页面上点击付款的按钮,如果不超时,则跳转到付款成功页面
  4. 如果超时,则给用户发送消息通知,询问用户尚未付款,是否还需要?

配置代码:

//订单最多存在10s args.put("x-message-ttl", 10 * 1000); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", "ex.go.dlx"); //声明当前队列绑定的死信路由key args.put("x-dead-letter-routing-key", "go.dlx"); 

Time To Live(TTL)
RabbitMQ可以针对队列设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时,则消息变为dead letter(死信),简单来说:就是如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃。

四、安装延时队列插件(rabbitmq_delayed_message_exchange)

由于是外网,可能下载速度有些慢,小名在这里帮大家准备好了安装包,大家可以直接下载使用

地址:

https://wwp.lanzouq.com/ifWwA007nwmf 

密码:

eamon 

第一步: 下载好先将文件解压到本地电脑上(网盘要求,无法上传无法识别的*.ez文件,小名压缩了一下上传的)
第二步: 上传到服务器的RabbitMQ的插件目录(/rabbitmq/plugins)中
第三步: 进入RabbitMQ的安装目录下的sbin目录,执行下面命令让该插件生效




rabbitmq-plugins enable rabbitmq_delayed_message_exchange 

第四步: 重启RabbitMQ

关闭服务:

rabbitmqctl stop 

启动服务:

rabbitmq-server -detached 

五、实现插件版的延时队列的实例

5.1 新增场景

假设上篇文章的方式只是在某app内信息推送,后续添加新需求,比如1分钟发邮件,1小时短信提醒等等,我们就需要创建很多的队列用来接收不同的消息,而且我们并不能保证这些订单的是按顺序提醒的( 即:有可能存在队列中”A单“提醒时间长于队列中”B单“的时间戳),这时我们就需要一个更通用的方式来发送此类消息,这里我们用到了上述的延时队列插件rabbitmq_delayed_message_exchange

5.2 调整需求

  1. 用户下单
  2. 用户下单后展示等待付款页面
  3. 在页面上点击付款的按钮,如果不超时,则跳转到付款成功页面
  4. 如果超过10s,则给用户发送系统消息通知,询问用户尚未付款,是否还需要?
    上文中小名已经实现了一个超时10s给用户发送消息的功能,接下来,我们对上篇文章的代码做如下

5.3 根据新需求修改代码

  1. 新增队列绑定
@Configuration public class DelayedConfig { 
       public static final String DELAYED_QUEUE_NAME = "q.delay.plugin"; public static final String DELAYED_EXCHANGE_NAME = "ex.delay.plugin"; public static final String DELAYED_ROUTING_KEY = "delay.plugin"; @Bean public Queue queueDelay() { 
       return new Queue(DELAYED_QUEUE_NAME); } @Bean public CustomExchange exchangeDelay() { 
       Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingDelayPlugin(@Qualifier("queueDelay") Queue queue, @Qualifier("exchangeDelay") CustomExchange customExchange) { 
       return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs(); } } 
  1. 监听器做一些修改
    小名先说下需要修改的部分,翻遍大家对比,文末贴出完整版。

1)新增一个消费者

 //插件延迟队列,监听 @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveD(Message message, Channel channel) throws IOException { 
       String msg = new String(message.getBody()); System.out.println("【※】当前时间:"+new Date().toString()+",延时队列收到消息:"+msg); } 

2)新增生产者

@RabbitListener(queues = "q.go.dlx") public void dlxListener(Message message, Channel channel) throws IOException { 
       //省略…… //未支付,1min后给用户发邮箱信息 long t = System.currentTimeMillis(); String delayOneMin = String.valueOf(this.dateRoll(new Date(), Calendar.MINUTE, 1).getTime() - t); sendDelayMsgByPlugin(message.getBody()+"【邮箱消息】", delayOneMin); //未支付,1小时后给用户发短信 String delayOneHour = String.valueOf(this.dateRoll(new Date(), Calendar.HOUR, 1).getTime() - t); sendDelayMsgByPlugin(message.getBody()+"【短信消息】", delayOneHour); } } public void sendDelayMsgByPlugin(String msg, String delayTime) { 
       System.out.println("延迟时间"+delayTime); rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{ 
       a.getMessageProperties().setDelay(Integer.valueOf(delayTime));//60*1000和Integer.valueOf(delayTime)的区别 return a; }); } 

【完整版代码】

@Component @Slf4j public class MqListener { 
       @Autowired IPracticeDlxOrderService iPracticeDlxOrderService; @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = "q.go.dlx") public void dlxListener(Message message, Channel channel) throws IOException { 
       System.out.println("支付超时"); Long id = Long.valueOf(new String(message.getBody(), "utf-8")); PracticeDlxOrder order = iPracticeDlxOrderService.lambdaQuery().eq(PracticeDlxOrder::getId, id).one(); Boolean payStatue = order.getPay(); //判断是否支付 if (!payStatue) { 
      //未支付,修改未超时 UpdateWrapper<PracticeDlxOrder> dlxOrder = new UpdateWrapper<>(); dlxOrder.eq("id", id); dlxOrder.set("timeout", 1); iPracticeDlxOrderService.update(dlxOrder); log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), message, new Date().toString()); //未支付,10后给用户发app信息 sendDelayMsg(id); //未支付,1min后给用户发邮箱信息 long t = System.currentTimeMillis(); String delayOneMin = String.valueOf(this.dateRoll(new Date(), Calendar.MINUTE, 1).getTime() - t); sendDelayMsgByPlugin(message.getBody()+"【邮箱消息】", delayOneMin); //未支付,1小时后给用户发短信 String delayOneHour = String.valueOf(this.dateRoll(new Date(), Calendar.HOUR, 1).getTime() - t); sendDelayMsgByPlugin(message.getBody()+"【短信消息】", delayOneHour); } } public Date dateRoll(Date date, int i, int d) { 
       // 获取Calendar对象并以传进来的时间为准 Calendar calendar = Calendar.getInstance(); calendar.setTime(date); // 将现在的时间滚动固定时长,转换为Date类型赋值 calendar.add(i, d); // 转换为Date类型再赋值 date = calendar.getTime(); return date; } //死信队列监听 @RabbitListener(queues = "q.delay") public void delayListener(Message message, Channel channel) throws IOException { 
       System.out.println(new String(message.getBody())); } //插件延迟队列,监听 @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveD(Message message, Channel channel) throws IOException { 
       String msg = new String(message.getBody()); System.out.println("【※】当前时间:"+new Date().toString()+",延时队列收到消息:"+msg); } / * 未支付,10s后给用户发信息 */ public void sendDelayMsg(Long id){ 
       rabbitTemplate.setMandatory(true); //id + 时间戳 全局唯一 Date date = DateUtil.getDate(new Date(),1,10); CorrelationData correlationData = new CorrelationData(date.toString()); //发送消息时指定 header 延迟时间 rabbitTemplate.convertAndSend("ex.delay", "q.delay", "您的订单号:" + id + "尚未付款,是否还需要?", new MessagePostProcessor() { 
       @Override public Message postProcessMessage(Message message) throws AmqpException { 
       //设置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(10*1000); return message; } }, correlationData); } / * * @param msg * @param delayTime */ public void sendDelayMsgByPlugin(String msg, String delayTime) { 
       System.out.println("延迟时间"+delayTime); rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{ 
       a.getMessageProperties().setDelay(Integer.valueOf(delayTime));//60*1000和Integer.valueOf(delayTime)的区别 return a; }); } } 
  1. 运行结果:
支付超时 2022-02-17 17:28:10.650 INFO 18324 --- [ntContainer#2-1] e.mq.dlx.modules.listener.MqListener : 当前时间:Thu Feb 17 17:28:10 CST 2022,收到请求,msg:(Body:'' MessageProperties [headers={ 
      x-first-death-exchange=ex.go, x-death=[{ 
      reason=expired, count=1, exchange=ex.go, time=Thu Feb 17 17:28:08 CST 2022, routing-keys=[go], queue=q.go}], x-first-death-reason=expired, x-first-death-queue=q.go}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ex.go.dlx, receivedRoutingKey=go.dlx, deliveryTag=1, consumerTag=amq.ctag-SasPqfbiS6-pt-e54uV5Hw, consumerQueue=q.go.dlx]),delayTime:Thu Feb 17 17:28:10 CST 2022 延迟时间60000 您的订单号:尚未付款,是否还需要? 延迟时间 2022-02-17 17:28:27.268 WARN 18324 --- [nectionFactory1] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available 【※】当前时间:Thu Feb 17 17:29:27 CST 2022,延时队列收到消息:[B@36c9cd1【邮箱消息】 【※】当前时间:Thu Feb 17 18:28:44 CST 2022,延时队列收到消息:[B@36c9cd1【短信消息】 
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/199439.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月20日 下午12:58
下一篇 2026年3月20日 下午12:58


相关推荐

  • 计算机ata考试试题答案,计算机ATA考试(高级)第一套试卷

    计算机ata考试试题答案,计算机ATA考试(高级)第一套试卷计算机ATA考试(高级)第一套试题一、启动资源管理器二、在C盘的根目录下新建文件夹,文件名为“4000001”三、将C盘下“KSML2”文件夹内的文件KS1-7.DOC、KS2-5.DOC、KS3-14.DOC、KS4-20.XLS、KS5-8.XLS、KS6-6.ppt、KS7-18.XLS、KS8-4.PST一次性复制到C盘下4000001文件夹中,并分别重命名为A1.DOC、A2.DOC、A…

    2022年7月13日
    27
  • Python之深入解析一行代码计算每个省面积的神器Geopandas

    Python之深入解析一行代码计算每个省面积的神器Geopandas一 前言 GeoPandas 是一个基于 pandas 针对地理数据做了特别支持的第三方模块 它继承 pandas Series 和 pandas Dataframe 实现了 GeoSeries 和 GeoDataFrame 类 使得其操纵和分析平面几何对象非常方便 二 准备 Python 安装在 windows 上安装 Python 下载 Python 的最新版本 访问链接 Python 官网 在 Windows 操作系统上安装 Python3 10 0 大家也可以自行选择最新版

    2025年8月21日
    6
  • KMS模拟器VLMCSD再次更新

    KMS模拟器VLMCSD再次更新对许多 windows 用户来说 KMS 模拟器的出现简直是个福音 帮助大量用户解决了 windows office 的激活问题 KMS 模拟器从最早的虚拟机镜像移植逐步进化到了现在的 RPC 报文模拟 VLMCSD 的最近一次更新是 2016 年 3 月 7 日 关于如何使用 可以点这里 这篇文章是以 2015 年的版本写的 VLMCSD 发布地址 http forums mydigitallif info thre

    2026年3月17日
    2
  • 2026年郑州热力集团开展非采暖期首次供热管网冷态运行

    2026年郑州热力集团开展非采暖期首次供热管网冷态运行

    2026年3月14日
    1
  • 树莓派拓展模拟量采集(AD)功能

    树莓派拓展模拟量采集(AD)功能1 前言树莓派自身不带有模拟量采集功能 A D 功能 当需要 AD 功能时 常通过 IIC 外接一个 A D 模块来实现 如 8 位 A D 芯片 PCA9685 本文首先简要介绍 PCA9685 特性 然后基于树莓派的 Bcm2835 库开发 PCA9685 的驱动库 树莓派安装 Bcm2835 库参考这篇文章 2 PCA9685PCA96 芯片有 4 路 8 位的 A D 采集通道和一路 8 位 D A 输出通道 其他通过 IIC 与外部通信 其芯片引脚见下图 其工作电压 VDD 可以 2 5 6V 参考电压 VREF 为 A D 转化参考电压可以与 VDD 一致也可不一致

    2026年3月17日
    2
  • OpenClaw多Agent协作完全指南

    OpenClaw多Agent协作完全指南

    2026年3月12日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号