
系列文章目录
文章目录
前言
恭喜所有看到本篇文章的小伙伴,成功解锁了RabbitMQ系列之高级特性插件版
延迟队列的内容?通过本文,你将清楚的了解到:什么是延时队列?延时队列使用场景?如何安装安装延时队列插件(rabbitmq_delayed_message_exchange)??本文最后,小名将上一篇文章的实例做一些修改来实现新的效果?

一、什么是延时队列
什么是延时队列?顾名思义:首先它是一种队列,再给它附加一个延迟消费队列消息的功能,也就是说延时队列中的元素是都是带时间属性的,可以指定队列中的消息在哪个时间点被消费。
简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
二、延时队列使用场景
拿第一个场景来说,系统创建订单之后,需要取消所有超过30分钟没有支付的订单。拿有“重度选择恐惧症”的小名来说吧,也许在加购物车,去支付这些操作都还好好的,突然在付款界面看到价格后停住了,发现这东西我不是那么的需要,就放弃支付了,相信一天之内这样的小伙伴不在少数。比如:小名03:15放弃支付了,小红03:16放弃支付了,小刚03:15放弃支付了,小王04:45放弃支付了……我们如何让系统知道在03:45通知小名和小刚,3:46通知小红,05:15通知小王呢?
再如后面的几个场景:发生新用户注册事件,三天后检查新注册用户的活动数据,然后通知没有任何活动记录的用户;发生商户一个月内还没上传商品信息事件,则冻结该商户的商铺;发生预定会议事件,判断离会议开始是否只有十分钟了,如果是,则通知各个与会人员。
2.2 省时省力的解决方法
这几种场景,你是不是感觉使用定时任务,轮询所有数据,每秒查一次,取出需要被处理的数据,然后运行相应的业务代码处理就可以了?的确如果数据量比较少,这样做即省时又省力。
2.3 上述做法的缺点
如果数据量比较大,并且时效性较强的场景“30分钟内没有支付的订单,自动取消订单并通知用户”,在很短时间内,没有支付的订单数据可能多到惊人,如果是活动期间甚至会达到百万甚至千万级别,对于这么庞大的数据量依旧使用上述的轮询方式,显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
2.4 使用延时队列的必要性
如果使用延时消息队列,我们在创建订单的同时将时间推迟30分钟放入消息中间件中,等时间一到再取出消费即可。
三、RabbitMQ中的TTL
上文中小名已经解释过了,这里呢,帮大家简单回忆一下
需求:
模拟用户商城购买商品时的两种情况:1. 成功下单,2. 超时提醒
- 用户下单
- 用户下单后展示等待付款页面
- 在页面上点击付款的按钮,如果不超时,则跳转到付款成功页面
- 如果超时,则给用户发送消息通知,询问用户尚未付款,是否还需要?
配置代码:
//订单最多存在10s args.put("x-message-ttl", 10 * 1000); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", "ex.go.dlx"); //声明当前队列绑定的死信路由key args.put("x-dead-letter-routing-key", "go.dlx");
Time To Live(TTL)
RabbitMQ可以针对队列设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时,则消息变为dead letter(死信),简单来说:就是如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃。
四、安装延时队列插件(rabbitmq_delayed_message_exchange)
由于是外网,可能下载速度有些慢,小名在这里帮大家准备好了安装包,大家可以直接下载使用
地址:
https://wwp.lanzouq.com/ifWwA007nwmf
密码:
eamon
第一步: 下载好先将文件解压到本地电脑上(网盘要求,无法上传无法识别的*.ez文件,小名压缩了一下上传的)
第二步: 上传到服务器的RabbitMQ的插件目录(/rabbitmq/plugins)中
第三步: 进入RabbitMQ的安装目录下的sbin目录,执行下面命令让该插件生效
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
第四步: 重启RabbitMQ
关闭服务:
rabbitmqctl stop
启动服务:
rabbitmq-server -detached
五、实现插件版的延时队列的实例
5.1 新增场景
假设上篇文章的方式只是在某app内信息推送,后续添加新需求,比如1分钟发邮件,1小时短信提醒等等,我们就需要创建很多的队列用来接收不同的消息,而且我们并不能保证这些订单的是按顺序提醒的( 即:有可能存在队列中”A单“提醒时间长于队列中”B单“的时间戳),这时我们就需要一个更通用的方式来发送此类消息,这里我们用到了上述的延时队列插件rabbitmq_delayed_message_exchange
5.2 调整需求
- 用户下单
- 用户下单后展示等待付款页面
- 在页面上点击付款的按钮,如果不超时,则跳转到付款成功页面
- 如果超过10s,则给用户发送系统消息通知,询问用户尚未付款,是否还需要?
上文中小名已经实现了一个超时10s给用户发送消息的功能,接下来,我们对上篇文章的代码做如下
5.3 根据新需求修改代码
- 新增队列绑定
@Configuration public class DelayedConfig {
public static final String DELAYED_QUEUE_NAME = "q.delay.plugin"; public static final String DELAYED_EXCHANGE_NAME = "ex.delay.plugin"; public static final String DELAYED_ROUTING_KEY = "delay.plugin"; @Bean public Queue queueDelay() {
return new Queue(DELAYED_QUEUE_NAME); } @Bean public CustomExchange exchangeDelay() {
Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingDelayPlugin(@Qualifier("queueDelay") Queue queue, @Qualifier("exchangeDelay") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
- 监听器做一些修改
小名先说下需要修改的部分,翻遍大家对比,文末贴出完整版。
1)新增一个消费者
//插件延迟队列,监听 @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody()); System.out.println("【※】当前时间:"+new Date().toString()+",延时队列收到消息:"+msg); }
2)新增生产者
@RabbitListener(queues = "q.go.dlx") public void dlxListener(Message message, Channel channel) throws IOException {
//省略…… //未支付,1min后给用户发邮箱信息 long t = System.currentTimeMillis(); String delayOneMin = String.valueOf(this.dateRoll(new Date(), Calendar.MINUTE, 1).getTime() - t); sendDelayMsgByPlugin(message.getBody()+"【邮箱消息】", delayOneMin); //未支付,1小时后给用户发短信 String delayOneHour = String.valueOf(this.dateRoll(new Date(), Calendar.HOUR, 1).getTime() - t); sendDelayMsgByPlugin(message.getBody()+"【短信消息】", delayOneHour); } } public void sendDelayMsgByPlugin(String msg, String delayTime) {
System.out.println("延迟时间"+delayTime); rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
a.getMessageProperties().setDelay(Integer.valueOf(delayTime));//60*1000和Integer.valueOf(delayTime)的区别 return a; }); }
【完整版代码】
@Component @Slf4j public class MqListener {
@Autowired IPracticeDlxOrderService iPracticeDlxOrderService; @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = "q.go.dlx") public void dlxListener(Message message, Channel channel) throws IOException {
System.out.println("支付超时"); Long id = Long.valueOf(new String(message.getBody(), "utf-8")); PracticeDlxOrder order = iPracticeDlxOrderService.lambdaQuery().eq(PracticeDlxOrder::getId, id).one(); Boolean payStatue = order.getPay(); //判断是否支付 if (!payStatue) {
//未支付,修改未超时 UpdateWrapper<PracticeDlxOrder> dlxOrder = new UpdateWrapper<>(); dlxOrder.eq("id", id); dlxOrder.set("timeout", 1); iPracticeDlxOrderService.update(dlxOrder); log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), message, new Date().toString()); //未支付,10后给用户发app信息 sendDelayMsg(id); //未支付,1min后给用户发邮箱信息 long t = System.currentTimeMillis(); String delayOneMin = String.valueOf(this.dateRoll(new Date(), Calendar.MINUTE, 1).getTime() - t); sendDelayMsgByPlugin(message.getBody()+"【邮箱消息】", delayOneMin); //未支付,1小时后给用户发短信 String delayOneHour = String.valueOf(this.dateRoll(new Date(), Calendar.HOUR, 1).getTime() - t); sendDelayMsgByPlugin(message.getBody()+"【短信消息】", delayOneHour); } } public Date dateRoll(Date date, int i, int d) {
// 获取Calendar对象并以传进来的时间为准 Calendar calendar = Calendar.getInstance(); calendar.setTime(date); // 将现在的时间滚动固定时长,转换为Date类型赋值 calendar.add(i, d); // 转换为Date类型再赋值 date = calendar.getTime(); return date; } //死信队列监听 @RabbitListener(queues = "q.delay") public void delayListener(Message message, Channel channel) throws IOException {
System.out.println(new String(message.getBody())); } //插件延迟队列,监听 @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody()); System.out.println("【※】当前时间:"+new Date().toString()+",延时队列收到消息:"+msg); } / * 未支付,10s后给用户发信息 */ public void sendDelayMsg(Long id){
rabbitTemplate.setMandatory(true); //id + 时间戳 全局唯一 Date date = DateUtil.getDate(new Date(),1,10); CorrelationData correlationData = new CorrelationData(date.toString()); //发送消息时指定 header 延迟时间 rabbitTemplate.convertAndSend("ex.delay", "q.delay", "您的订单号:" + id + "尚未付款,是否还需要?", new MessagePostProcessor() {
@Override public Message postProcessMessage(Message message) throws AmqpException {
//设置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(10*1000); return message; } }, correlationData); } / * * @param msg * @param delayTime */ public void sendDelayMsgByPlugin(String msg, String delayTime) {
System.out.println("延迟时间"+delayTime); rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
a.getMessageProperties().setDelay(Integer.valueOf(delayTime));//60*1000和Integer.valueOf(delayTime)的区别 return a; }); } }
- 运行结果:
支付超时 2022-02-17 17:28:10.650 INFO 18324 --- [ntContainer#2-1] e.mq.dlx.modules.listener.MqListener : 当前时间:Thu Feb 17 17:28:10 CST 2022,收到请求,msg:(Body:'' MessageProperties [headers={
x-first-death-exchange=ex.go, x-death=[{
reason=expired, count=1, exchange=ex.go, time=Thu Feb 17 17:28:08 CST 2022, routing-keys=[go], queue=q.go}], x-first-death-reason=expired, x-first-death-queue=q.go}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ex.go.dlx, receivedRoutingKey=go.dlx, deliveryTag=1, consumerTag=amq.ctag-SasPqfbiS6-pt-e54uV5Hw, consumerQueue=q.go.dlx]),delayTime:Thu Feb 17 17:28:10 CST 2022 延迟时间60000 您的订单号:尚未付款,是否还需要? 延迟时间 2022-02-17 17:28:27.268 WARN 18324 --- [nectionFactory1] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available 【※】当前时间:Thu Feb 17 17:29:27 CST 2022,延时队列收到消息:[B@36c9cd1【邮箱消息】 【※】当前时间:Thu Feb 17 18:28:44 CST 2022,延时队列收到消息:[B@36c9cd1【短信消息】
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/199439.html原文链接:https://javaforall.net
