【延时任务处理、订单失效】RabbitMQ死信队列实现

【延时任务处理、订单失效】RabbitMQ死信队列实现订单失效问题订单失效问题比较麻烦的地方就是如何能够实时获取失效的订单 对于这种问题一般有两种解决方案 定时任务处理 延时任务处理定时任务处理用户下订单后先生成订单信息 然后将该订单加入到定时任务中 30 分钟后执行 当到达指定时间后检查订单状态 如果未支付则标识该订单失效 定时去轮询数据库 缓存 看订单的状态 这种方式的问题很明显 当集群部署服务器的时候需要做分布式锁进行协调 而且实时性不高 对数据库会产生压力延时任务处理当用户下订单后 将用户的订单的标识全部发送到延时队列中 3

【订单失效】RabbitMQ死信队列实现

之前做商城遇到一个关于订单未支付超时失效的问题,总结一下

1.订单失效问题

订单失效问题比较麻烦的地方就是如何能够实时获取失效的订单。

对于这种问题一般有两种解决方案: 定时任务处理,延时任务处理

2.定时任务处理

  1. 用户下订单后先生成订单信息,然后将该订单加入到定时任务中(30分钟后执行),当到达指定时间后检查订单状态,如果未支付则标识该订单失效。
  2. 定时去轮询数据库/缓存,看订单的状态。这种方式的问题很明显,当集群部署服务器的时候需要做分布式锁进行协调,而且实时性不高,对数据库会产生压力

3.延时任务处理

当用户下订单后,将用户的订单的标识全部发送到延时队列中,30分钟后进去消费队列中被消费,消费时先检查该订单的状态,如果未支付则标识该订单失效。

有以下几种延时任务处理方式

  • Java自带的DelayedQuene队列

这是java本身提供的一种延时队列,如果项目业务复杂性不高可以考虑这种方式。它是使用jvm内存来实现的,停机会丢失数据,扩展性不强

  • 使用redis监听key的过期来实现

当用户下订单后把订单信息设置为redis的key,30分钟失效,程序编写监听redis的key失效,然后处理订单(我也尝试过这种方式)。这种方式最大的弊端就是只能监听一台redis的key失效,集群下将无法实现,也有人监听集群下的每个redis节点的key,但我认为这样做很不合适。如果项目业务复杂性不高,redis单机部署,就可以考虑这种方式

  • RabbitMQ死信队列实现

重点介绍这种方式

4.RabbitMQ死信队列实现监听订单失效

AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。

【延时任务处理、订单失效】RabbitMQ死信队列实现

  • Time To Live(TTL)

RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

  • Dead Letter Exchanges(DLX)

下面来做一个例子来实现订单失效,为了效果明显,我们把订单的失效时间设置为10秒 (java实现)

5.具体实现

5.1.环境/版本一览:

  • 开发工具:Intellij IDEA 2020.2.3
  • springboot:2.4.1
  • jdk:1.8.0_211
  • maven: 3.6.3
  • RabbitMQ:3.8.9

docker安装rabbitmq可以查看我这一篇博客  docker 安装rabbitMQ :https://laoniu.blog.csdn.net/article/details/

项目结构

【延时任务处理、订单失效】RabbitMQ死信队列实现

5.2.项目搭建

最下方有完整代码

  • 新建项目,配置pom.xml加入依赖
 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
  • 配置RabbitMQ的队列和路由信息  创建类 RabbitMQConfiuration
package com.niu.springbootrabbitmqdelayqueue.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; / * @description: rabbitMQ配置信息 * @author: nxq email:  * @createDate: 2020/12/18 8:09 上午 * @updateUser: nxq email:  * @updateDate: 2020/12/18 8:09 上午 * @updateRemark: * @version: 1.0 / @Configuration public class RabbitMQConfiguration { //队列名称 public final static String orderQueue = "order_queue"; //交换机名称 public final static String orderExchange = "order_exchange"; // routingKey public final static String routingKeyOrder = "routing_key_order"; //死信消息队列名称 public final static String dealQueueOrder = "deal_queue_order"; //死信交换机名称 public final static String dealExchangeOrder = "deal_exchange_order"; //死信 routingKey public final static String deadRoutingKeyOrder = "dead_routing_key_order"; //死信队列 交换机标识符 public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; //死信队列交换机绑定键标识符 public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; @Autowired private CachingConnectionFactory connectionFactory; @Bean public Queue orderQueue() { // 将普通队列绑定到死信队列交换机上 Map<String, Object> args = new HashMap<>(2); //args.put("x-message-ttl", 5 * 1000);//直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活 //这里采用发送消息动态设置延迟时间,这样我们可以灵活控制 args.put(DEAD_LETTER_QUEUE_KEY, dealExchangeOrder); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKeyOrder); return new Queue(RabbitMQConfiguration.orderQueue, true, false, false, args); } //声明一个direct类型的交换机 @Bean DirectExchange orderExchange() { return new DirectExchange(RabbitMQConfiguration.orderExchange); } //绑定Queue队列到交换机,并且指定routingKey @Bean Binding bindingDirectExchangeDemo5( ) { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(routingKeyOrder); } //创建配置死信队列 @Bean public Queue deadQueueOrder() { Queue queue = new Queue(dealQueueOrder, true); return queue; } //创建死信交换机 @Bean public DirectExchange deadExchangeOrder() { return new DirectExchange(dealExchangeOrder); } //死信队列与死信交换机绑定 @Bean public Binding bindingDeadExchange() { return BindingBuilder.bind(deadQueueOrder()).to(deadExchangeOrder()).with(deadRoutingKeyOrder); } } 
  • 配置消息生产者
package com.niu.springbootrabbitmqdelayqueue.controller; import com.niu.springbootrabbitmqdelayqueue.config.RabbitMQConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; / * @description: 订单的控制器 * @author: nxq email:  * @createDate: 2020/12/18 8:20 上午 * @updateUser: nxq email:  * @updateDate: 2020/12/18 8:20 上午 * @updateRemark: * @version: 1.0 / @RestController @RequestMapping("/order") public class OrderController { private static final Logger logger = LoggerFactory.getLogger(OrderController.class); @Autowired private AmqpTemplate rabbitTemplate; / * 模拟提交订单 * @author nxq * @return java.lang.Object */ @GetMapping("") public Object submit(){ String orderId = UUID.randomUUID().toString(); logger.info("submit order {}", orderId); this.rabbitTemplate.convertAndSend( RabbitMQConfiguration.orderExchange, //发送至订单交换机 RabbitMQConfiguration.routingKeyOrder, //订单定routingKey orderId //订单号 这里可以传对象 比如直接传订单对象 , message -> { // 如果配置了 params.put("x-message-ttl", 5 * 1000); // 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间 message.getMessageProperties().setExpiration(1000 * 10 + ""); return message; }); return "{'orderId':'"+orderId+"'}"; } } 
  • 配置消费者 
package com.niu.springbootrabbitmqdelayqueue.listener; import com.niu.springbootrabbitmqdelayqueue.config.RabbitMQConfiguration; import com.niu.springbootrabbitmqdelayqueue.controller.OrderController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Date; import java.util.Map; import com.rabbitmq.client.Channel; / * @description: 订单失效监听器 * @author: nxq email:  * @createDate: 2020/12/18 8:30 上午 * @updateUser: nxq email:  * @updateDate: 2020/12/18 8:30 上午 * @updateRemark: * @version: 1.0 / @Component public class OrderFailureListener { private static final Logger logger = LoggerFactory.getLogger(OrderFailureListener.class); @RabbitListener( queues = RabbitMQConfiguration.dealQueueOrder //设置订单失效的队列 ) public void process(String order, Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException { logger.info("【订单号】 - [{}]", order); // 判断订单是否已经支付,如果支付则;否则,取消订单(逻辑代码省略) // 手动ack // Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收 // channel.basicAck(deliveryTag, false); System.out.println("执行结束...."); } } 
  • 配置文件 application.yml 加入rabbitmq的相关配置
spring: rabbitmq: host: localhost port: 5672 username: admin password: admin 
  • 配置启动类
package com.niu.springbootrabbitmqdelayqueue; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringbootRabbitmqDelayQueueApplication { public static void main(String[] args) { SpringApplication.run(SpringbootRabbitmqDelayQueueApplication.class, args); } } 

6.启动测试

启动成功后我们访问一下配置的接口模拟提交订单 http://localhost:8080/order

【延时任务处理、订单失效】RabbitMQ死信队列实现

查看控制台,10秒后

【延时任务处理、订单失效】RabbitMQ死信队列实现

芜湖~起飞?️,大功告成,可以说实时性已经非常高了,可以试着多提交几次订单

打开rabbitMQ的控制台看一下,发现多出来两个队列

【延时任务处理、订单失效】RabbitMQ死信队列实现

使用这种方式不止可以做订单失效,比如说优惠券过期啊等等延时失效问题。可以集群部署rabbitmq,开启消息确认机制。

这种实现方法的基本使用到此就讲完了,完整代码已经推送至github :https://github.com/m/springboot-rabbitmq-delay-queue

转载请注明出处:https://laoniu.blog.csdn.net/article/details/

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/231607.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • PHP调用纯真IP数据库返回具体地址

    PHP调用纯真IP数据库返回具体地址

    2021年10月18日
    81
  • python return换行(python中的换行)

    广告关闭腾讯云11.11云上盛惠,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元!代码太长怎么办,反斜杠引号‘’来帮忙!在写list或者较长的字符串时候,或者多个循环造成ide不够用时,就需要代码换行了。主要的代码换行有通用的反斜杠和针对字符串起作用的三引号结构。1.反斜杠对于一般表达式来说,反斜杠后直接回车即可实现续行,使用的关键在于反斜杠后不能用空格…

    2022年4月16日
    322
  • linux phy调试方法_php执行shell命令

    linux phy调试方法_php执行shell命令enumphy_state{ PHY_DOWN=0, PHY_STARTING,//1 PHY_READY,//2 PHY_PENDING,//3 PHY_UP,//4 PHY_AN,//5 PHY_RUNNING,//6 PHY_NOLINK,//7 PHY_FORCING,//8 PHY_CHANGELINK,//9 PHY_HALTED,//10…

    2025年5月25日
    2
  • 大学项目总结(一)——基于Android的智能点餐系统

    大学项目总结(一)——基于Android的智能点餐系统这个项目是在我大一下学期开始进行制作,并一直到大一暑假结束完成。一共由我和另一个同学两个人完成,我主要负责前端功能,他主要负责后台下面是主要的图册:…

    2022年6月19日
    29
  • db2top命令详解「建议收藏」

    db2top命令详解「建议收藏」目录1.db2top命令语法2.db2top运行模式2.1交互模式2.2批量模式3.db2top监控模式3.1数据库监控(d)3.2表空间监控(t)3.3动态SQL监控(D)3.4会话监控(l)3.5缓存池监控(b)3.6锁监控(U)3.7表监控(T)3.8瓶颈监控(B)4.其他1.db2top命令语法可使用命令行db2top–h查看,这里就不做赘述了。2.db2top运行模式db2t…

    2025年12月7日
    1
  • 可使用 git 操作的数据库 dolt

    可使用 git 操作的数据库 dolt什么是dolt?Dolt是一个SQL数据库,您可以像git存储库一样分叉、克隆、分支、合并、推送和拉取。像任何MySQL数据库一样连接到Dolt以使用SQL命令运行查询或更新数据,使用Golang语言编写。它与MySQL关系型数据库一样,具有表、视图等概念,支持数据的增删改查等操作。并且它提供了一个命令行工具,完美支持所有的git命令。…

    2025年8月23日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号