RabbitMQ延迟队列

RabbitMQ延迟队列RabbitMQ 延迟队列使用场景消息延迟推送的 预支付订单创建成功后 30 分钟后还未完成支付则自动修改订单状态 自动取消订单 用户注册成功后 如果 3 天没有登录则进行短信提醒 优惠券过期前发送短信进行提醒等业务场景

目录

? 介绍

? 使用场景

?‍? 模拟案例

? 准备工作

? 写法一(死信队列TTL)

 RabbitMQ配置文件

 生产者

消费者

测试

? 写法二 (死信队列TTL)

 RabbitMQ配置文件

生产者

消费者

测试

? 写法三 (插件版本-推荐)

插件安装

RabbitMQ配置文件

生产者

消费者

测试

? 延迟队列方法推荐 


? 介绍

顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

? 使用场景

  • 预支付订单创建成功后,30分钟后还没有支付,自动取消订单,修改订单状态
  • 用户注册成功后,如果3天没有登录则进行短信提醒
  • 优惠券过期前发送短信进行提醒
  • ….

以上场景都可以用延时队列来完成


?‍? 模拟案例

需求:生产者发布消息,10秒、60秒后消费者拿到消息进行消费

? 准备工作

导入RabbitMQ依赖

 
     
     
       org.springframework.boot 
      
     
       spring-boot-starter-amqp 
      
    

 配置RabbitMQ连接相关信息

#MySQL spring: rabbitmq: host: 127.0.0.1 port: 5672 username: xxxx password: xxx server: port: 8087

? 写法一(死信队列TTL)

生产者生产消息——>到交换机分发给对应的队列(A10秒过期,B60秒过期)——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)RabbitMQ延迟队列

 RabbitMQ配置文件

import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; / * @author 小影 * @create: 2022/8/18 10:26 * @describe:mq配置 如示例图配置:2交换机、4队列、4路由key */ @Configuration public class RabbitMQConfiguration { // 延迟交换机 public static final String DELAY_EXCHANGE_NAME = "delay.exchange"; // 延迟队列 public static final String DELAY_QUEUE_NAME_A = "delay.queue.a"; public static final String DELAY_QUEUE_NAME_B = "delay.queue.b"; // 延迟队列路由key public static final String DELAY_QUEUE_ROUTING_KEY_A = "delay.routingKey.a"; public static final String DELAY_QUEUE_ROUTING_KEY_B = "delay.routingKey.b"; // 死信交换机 public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange"; // 死信队列 public static final String DEAD_LETTER_QUEUE_NAME_A = "dead.letter.queue.a"; public static final String DEAD_LETTER_QUEUE_NAME_B = "dead.letter.queue.b"; // 私信队列路由key public static final String DEAD_LETTER_ROUTING_KEY_A = "dead.letter.delay_10s.routingkey.a"; public static final String DEAD_LETTER_ROUTING_KEY_B = "dead.letter.delay_60s.routingkey.b"; // 声明延迟交换机 @Bean("delayExchange") public DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); } // 声明死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME); } // 声明延迟队列A,延迟10s,并且绑定到对应的死信交换机 @Bean("delayQueueA") public Queue delayQueueA() { HashMap 
     
       args = new HashMap<>(); // 声明队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); // 声明队列的属性路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_A); // 声明队列的消息TTL存活时间 args.put("x-message-ttl", 10000); return QueueBuilder.durable(DELAY_QUEUE_NAME_A).withArguments(args).build(); } // 声明延迟队列B,延迟60s,并且绑定到对应的死信交换机 @Bean("delayQueueB") public Queue delayQueueB() { HashMap 
      
        args = new HashMap<>(); // 声明队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); // 声明队列的属性路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_B); // 声明队列的消息TTL存活时间 args.put("x-message-ttl", 60000); return QueueBuilder.durable(DELAY_QUEUE_NAME_B).withArguments(args).build(); } // 声明死信队列A,用于接收延迟10S的消息 @Bean("deadLetterQueueA") public Queue deadLetterQueueA() { return new Queue(DEAD_LETTER_QUEUE_NAME_A); } // 声明死信队列B,用于接收延迟60S的消息 @Bean("deadLetterQueueB") public Queue deadLetterQueueB() { return new Queue(DEAD_LETTER_QUEUE_NAME_B); } // 设置延迟队列A的绑定关系 @Bean public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_A); } // 设置延迟队列B的绑定关系 @Bean public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_B); } // 设置死信队列A的绑定关系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_A); } // 设置死信队列B的绑定关系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_B); } } 
       
     

此配置文件的代码关系图如下

RabbitMQ延迟队列

 生产者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME; import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_A; import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_B; / * @author 小影 * @create: 2022/8/18 11:13 * @describe:延迟消息生产者 */ @Component public class DelayMessageProducer { @Resource private RabbitTemplate rabbitTemplate; public void send(String message,int type) { switch (type){ case 1: // 10s的消息 // param:队列名称、路由key、消息 rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_A, message); break; case 2:// 60s的消息 rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_B, message); break; } } }

消费者

import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_A; import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_B; / * @author 小影 * @create: 2022/8/18 11:19 * @describe:死信消费者 */ @Slf4j @Component public class DeadLetterQueueConsumer { / * 监听私信队列A * @param message * @param channel 作手动回执、确认 */ @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_A) public void receiveA(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(),msg); } / * 监听私信队列B * @param message * @param channel 作手动回执、确认 */ @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_B) public void receiveB(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{},死信队列B收到消息:{}", LocalDateTime.now(),msg); } } 

测试

@Slf4j @RestController @RequestMapping("rabbitmq") public class RabbitMqController { @Resource private DelayMessageProducer producer; @GetMapping("send") public void send(String message, Integer type) { log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, Objects.requireNonNull(type)); producer.send(message, type); } }

分别请求

http://localhost:8089/rabbitmq/send?message=我是10秒&type=1

http://localhost:8089/rabbitmq/send?message=我是60秒&type=2

RabbitMQ延迟队列

如果出现异常:Channel shutdown: channel error; protocol method:#method(reply-code=406, reply-text=PRECONDITION_FAILED – inequivalent arg ‘type’ for exchange ‘delay.exchange’ in vhost ‘/’: received ”x-delayed-message” but current is ‘direct’, class-id=40, method-id=10

可能是mq已经存在交换机了先去删掉

弊端:后期要扩展其他不同延时的时间,就需要增加延时的配置,非常麻烦


? 写法二 (死信队列TTL)

生产者生产消息(并设置过期时间)——>到交换机分发给延迟队列——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

RabbitMQ延迟队列

 RabbitMQ配置文件

import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; / * @author 小影 * @create: 2022/8/18 10:26 * @describe:mq配置 如示例图配置:2交换机、2队列、2路由key */ @Configuration public class RabbitMQConfiguration { // 延迟交换机 public static final String DELAY_EXCHANGE_NAME = "delay.exchange"; // 延迟队列 public static final String DELAY_QUEUE_NAME = "delay.queue"; // 延迟队列路由key public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey"; // 死信交换机 public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange"; // 死信队列 public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue"; // 私信队列路由key public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routingkey"; // 声明延迟交换机 @Bean("delayExchange") public DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); } // 声明死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME); } // 声明延迟队列,不设置存活时间,并且绑定到对应的死信交换机 @Bean("delayQueue") public Queue delayQueue() { HashMap 
      
        args = new HashMap<>(); // 声明队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); // 声明队列的属性路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build(); } // 声明死信队列 @Bean("deadLetterQueue") public Queue deadLetterQueue() { return new Queue(DEAD_LETTER_QUEUE_NAME); } // 设置延迟队列的绑定关系 @Bean public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY); } // 设置死信队列的绑定关系 @Bean public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY); } } 
      

生产者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME; import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY; / * @author 小影 * @create: 2022/8/18 11:13 * @describe:延迟消息生产者 */ @Component public class DelayMessageProducer { @Resource private RabbitTemplate rabbitTemplate; / * * @param message 消息 * @param delayTime 存活时间 */ public void send(String message,String delayTime) { // param:延迟交换机,路由KEY,存活时间 rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> { msg.getMessageProperties().setExpiration(delayTime); return msg; }); } }

消费者

import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME; / * @author 小影 * @create: 2022/8/18 11:19 * @describe:死信消费者 */ @Slf4j @Component public class DeadLetterQueueConsumer { / * 监听私信队列A * @param message * @param channel 作手动回执、确认 */ @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME) public void receiveA(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{},死信队列收到消息:{}", LocalDateTime.now(),msg); } }

测试

@Slf4j @RestController @RequestMapping("rabbitmq") public class RabbitMqController { @Resource private DelayMessageProducer producer; @GetMapping("send") public void send(String message, String delayTime) { log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime); producer.send(message, delayTime); } }

分别请求

http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000

http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000

弊端:由于是先进先出的,如果60秒进去了,10秒在进去,10秒结束了,他要等60秒结束,60秒出来10秒才能出来


? 写法三 (插件版本-推荐)

安装插件后会生成新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制,接收消息后并未立即将消息投递至目标队列,而是存储在mnesia(一个分布式数据库)中,随后检测消息延迟时间,如达到投递时间讲其通过 x-delayed-type 类型标记的交换机投至目标队列。 

RabbitMQ延迟队列

插件安装

1.进入mq官网社区插件:Community Plugins — RabbitMQ

2.找到rabbitmq_delayed_message_exchange

RabbitMQ延迟队列

 选择对应版本的ez文件下载

 Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

 RabbitMQ延迟队列

 注:我的MQ是通过yum安装的

 1.在系统中查看安装的rabbitmq

rpm -qa |grep rabbitmq

RabbitMQ延迟队列

 2.查询mq的安装的相关文件目录

rpm -ql rabbitmq-server-3.10.7-1.el8.noarch

RabbitMQ延迟队列

 翻到最下面发现mnesia的安装目录; mnesia=分布式数据库,看看就好

RabbitMQ延迟队列

 然后把我们下载的ez安装包解压放到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.7/plugins 里面

RabbitMQ延迟队列

3.重启RabbitMQ服务

systemctl restart rabbitmq-server.service

4.重启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 RabbitMQ延迟队列


RabbitMQ配置文件

/ * @author 小影 * @create: 2022/8/18 10:26 * @describe:mq配置 如示例图配置:1交换机、1队列、1路由key */ @Configuration public class RabbitMQConfiguration { // 延迟交换机 public static final String DELAY_EXCHANGE_NAME = "delay.exchange"; // 延迟队列 public static final String DELAY_QUEUE_NAME = "delay.queue"; // 延迟队列路由key public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey"; // 声明延迟交换机 @Bean("delayExchange") public CustomExchange delayExchange() { HashMap 
        
          args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args); } // 声明延迟队列 @Bean("delayQueue") public Queue delayQueue() { return new Queue(DELAY_QUEUE_NAME); } // 设置延迟队列的绑定关系 @Bean public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY).noargs(); } } 
        

生产者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME; import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY; / * @author 小影 * @create: 2022/8/18 11:13 * @describe:延迟消息生产者 */ @Component public class DelayMessageProducer { @Resource private RabbitTemplate rabbitTemplate; / * * @param message 消息 * @param delayTime 存活时间 */ public void send(String message,Integer delayTime) { // param:延迟交换机,路由KEY,存活时间 rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> { msg.getMessageProperties().setDelay(delayTime); return msg; }); } }

消费者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_NAME; / * @author 小影 * @create: 2022/8/18 11:19 * @describe:消费者 */ @Slf4j @Component public class DeadLetterQueueConsumer { /* * 监听私信队列 * @param message * @param channel 作手动回执、确认 */ @RabbitListener(queues = DELAY_QUEUE_NAME) public void receiveA(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{},延迟队列收到消息:{}", LocalDateTime.now(),msg); } }

测试

@Slf4j @RestController @RequestMapping("rabbitmq") public class RabbitMqController { @Resource private DelayMessageProducer producer; @GetMapping("send") public void send(String message, Integer delayTime) { log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime); producer.send(message, delayTime); } }

启动项目查看rabbitmq的可视化界面

如下图此时生成的交换机是x-delayed-message类型的

RabbitMQ延迟队列

 分别发送:

http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000

http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000

RabbitMQ延迟队列

 结局并不是60秒先被消费,完成了我们的意愿。

原理:

  1. 交换机里面有个数据库,生产者生产信息把这个信息放入数据库中
  2. 交换机里面的插件就会一直监听这个时间
  3. 时间到了把对应数据取出来,放入队列,让消费者进行消费

? 延迟队列方法推荐 

RabbitMQ延迟队列

 这是小编在开发学习使用和总结,  这中间或许也存在着不足,希望可以得到大家的理解和建议。如有侵权联系小编!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/211458.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午10:23
下一篇 2026年3月18日 下午10:23


相关推荐

  • JAVA实现二维码扫码登录「建议收藏」

    实现客户端扫码登录分为下列四步.

    2022年4月12日
    419
  • Embedding相似度虚高,如何用langchain+Milvus搭建CRAG解决?

    Embedding相似度虚高,如何用langchain+Milvus搭建CRAG解决?

    2026年3月12日
    2
  • css 第二行换行,CSS换行详解

    css 第二行换行,CSS换行详解普通文本段落的换行案例说明 在实际应用中我们常遇到这样的问题 就是一段文本 有了明确的宽度 需要文本自动换行 如下图 案例分析 上面两个图 没有使用任何排版的 CSS 属性 也就是默认情况下 块元素里的文本 达到边缘以后会自动换行 但是这里有个特例 就是内容全部为英文字母 没有单词句子 没有词和词之间的空格 此时文本就不自动换行了 见下图 特殊情况 有时候你写的东东 默认情况下并不自动换行 这多半

    2026年3月18日
    2
  • 2021最新kali安装中文输入法

    2021最新kali安装中文输入法2021 最新 kali 安装中文输入法 1 打开 kali 命令终端 2 安装输入法框架 fcitx 3 安装输入法 4 配置 google 拼音输入法 1 打开 kali 命令终端 2 安装输入法框架 fcitx1 输入 sudoaptinsta 因为是普通用户所以需要输入登录密码进行认证 2 输入 y 进行确认 或者直接回车默认是 y 3 安装完成 3 安装输入法 1 输入 sudoaptinsta googlepinyin 直接回车 默认为 y 或者输入 y 回车

    2026年3月18日
    2
  • html 模板字符串,ES6:模板字符串

    html 模板字符串,ES6:模板字符串标签 模板字符串 JavaScriptES 前端 web 本博客版权归本人和饥人谷所有 转载需说明来源内容转载自阮一峰老师的 ES6 入门 1 基本用法传统的 JavaScript 语言 输出模板通常是这样写的 result append Thereare basket count itemsinyourb basket onSa

    2026年3月26日
    3
  • matlab数字图像处理常用操作

    matlab数字图像处理常用操作把图片加载到当前文件夹 strong 步骤 strong 1 读取图片 2 灰度化 3 二值化 4 去噪音 5 腐蚀膨胀 strong 过程 strong 1 读取图片 2 图像

    2026年3月16日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号