订单超时取消之延迟队列

订单超时取消之延迟队列延迟队列的用法 了解一下

什么是延迟队列?

我们先来看一个场景:以淘宝购物为例,当你提交订单之后有30分钟的支付时间,假如你30分钟之后还没有进行支付,订单就会被取消。现在让你来实现这个功能,你准备如何实现?

相信很多小伙伴第一反应就是定时轮询,设定一个定时任务去扫订单数据,一旦发现超过30分钟未支付的订单,就将订单状态update成已取消,这是一种最简单的方法,也是最容易实现的。这种方案的弊端在于:当数据量小时,不会存在问题,当数据量越来越大时,定时扫表会变得越来越慢,而且频繁的扫表会影响下单的效率

延迟队列就是用来解决这一类问题的,那么什么是延迟队列呢?

延迟队列是为了解决任务推迟执行的问题,消息进入延迟队列之后暂时不能被消费,等超过了设定的时间才能被消费者进行消费

可以想像一下这样一种场景,每个任务进入队列的时候都打上了一个时间标签,任务1(10分钟后执行)、任务2(30分钟后执行)、任务3(60分钟后执行),当到了标签对应的时间之后,任务才能被执行

常见的可以使用延迟队列场景:

  • 淘宝下单后,30分钟未支付要取消订单
  • 外卖订单1分钟后,短信提醒客户
  • 3天未评论自动好评

总之,需要延后执行的任务都可以用延迟队列来解决


延迟队列的实现方法

1、DelayQueue

在JDK的java.util.concurrent包中提供了延迟队列的实现DelayQueue,它提供了在指定的时间才能获取队列中元素的功能,队列头的元素是最接近过期时间的元素。如果没有过期元素,使用poll()方法会返回null。下面看代码实现

public class DelayTask implements Delayed { 
     private String msg; private long executeTime; public String getMsg() { 
     return msg; } public void setMsg(String msg) { 
     this.msg = msg; } public long getExecuteTime() { 
     return executeTime; } public void setExecuteTime(long executeTime) { 
     this.executeTime = executeTime; } public DelayTask(long delayTime, String msg){ 
     //到期时间 = 当前时间 + 延迟时间 this.executeTime = System.currentTimeMillis() + delayTime; this.msg = msg; } @Override public long getDelay(TimeUnit unit) { 
     return unit.convert(this.executeTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { 
     return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } } 

DelayQueue中的元素需要实现Delayed接口,我们定义一个任务来实现Delayed接口,主要有两个方法

  • getDelay(TimeUnit unit) 定义了剩余到期时间
  • compareTo(Delayed o) 定义了元素的排序规则
public class DelayQueueTest { 
     public static void main(String[] args){ 
     //创建一个延迟队列 DelayQueue<DelayTask> delayQueue = new DelayQueue<DelayTask>(); DelayQueueTest delayQueueTest = new DelayQueueTest(); //生产者放入消息 delayQueueTest.producer(delayQueue); //消费者消费消息 delayQueueTest.consumer(delayQueue); } //定义一个消费者,启动一个线程,循环从队列中拿元素 private void consumer(DelayQueue<DelayTask> delayTasks){ 
     new Thread(new Runnable() { 
     @Override public void run() { 
     while(true){ 
     try { 
     DelayTask delayTask = delayTasks.take(); System.out.println("消息消费时间:" + getCurrentTime() + ",msg:" + delayTask.getMsg()); } catch (InterruptedException e) { 
     e.printStackTrace(); } } } }).start(); } //生产者,将消息带上延迟时间,放入延迟队列 private void producer(DelayQueue<DelayTask> delayTasks){ 
     DelayTask delayTask = new DelayTask(5000,"delay msg"); System.out.println("消息放入时间:" + getCurrentTime()); delayTasks.add(delayTask); } public static String getCurrentTime(){ 
     Date d = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.format(d); } } 

如上的代码

  • 生产者放入一个任务,延迟5秒执行
  • 消费者不停的轮询队列,从中拿任务来执行

从输出结果可以看出,任务过了5秒才被消费者拿到,实现了任务的延迟执行

消息放入时间:2020-06-04 00:06:17 消息消费时间:2020-06-04 00:06:22,msg:delay msg 

rabbitMQ死信队列

  • 生产者发送消息给死信Exchange,通过routing-key消息发送到指定的死信队列,此时死信队列是没有消费者的
  • 死信队列中的消息到期后会自动转发到业务Exchange中,通过routing-key消息发送到指定的业务队列中
  • 业务处理的Consumer监控业务队列,取到转发过来的消息进行消费,从而达到延迟队列的效果

下面来看一下代码实现:

首先是rabbitmq的配置

spring: rabbitmq: host: localhost port: 5672 username: admin password: admin 

rabbitmq的Configuration

@Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitMQConfig { 
     private String host; private int port; private String username; private String password; public static String DELAY_EXCHANGE = "delay.exchange";//死信Exchange public static String BUSS_EXCHANGE = "buss.exchange";//业务Exchange public static String DELAY_QUEUE = "delay.queue";//死信队列 public static String BUSS_QUEUE = "buss.queue";//业务队列 / * 定义一个死信队列 * @return */ @Bean public Queue delayQueue(){ 
     Map<String,Object> args = new HashMap<String,Object>(); //消息过期后转发的exchange args.put("x-dead-letter-exchange",BUSS_EXCHANGE); //消息过期后转发的routing-key args.put("x-dead-letter-routing-key","delay.msg"); //队列中消息的过期时间(注意消息上也可以设置过期时间),两者若同时设置取其小 args.put("x-message-ttl",20000); return QueueBuilder.durable(DELAY_QUEUE).withArguments(args).build(); } / * 定义普通的业务队列 * @return */ @Bean public Queue bussQueue(){ 
     return new Queue(BUSS_QUEUE,true); } / * 死信Exchange * @return */ @Bean public TopicExchange delayTopicExchange(){ 
     return new TopicExchange(DELAY_EXCHANGE); } / * 业务Exchange * @return */ @Bean public TopicExchange bussTopicExchange(){ 
     return new TopicExchange(BUSS_EXCHANGE); } / * 绑定死信队列与死信Exchange,设置routing-key为queue.delay * @return */ @Bean public Binding bindingDelayExchangeMessage(){ 
     return BindingBuilder.bind(delayQueue()).to(delayTopicExchange()).with("queue.delay"); } / * 绑定业务队列与业务Exchange,设置routing-key为delay.msg * 注意:此处的rounting-key与死信队列的x-dead-letter-routing-key保持一致,才能保证死信消息过期后可以转发到此队列中 * @return */ @Bean public Binding bindingDelayMessage(){ 
     return BindingBuilder.bind(bussQueue()).to(bussTopicExchange()).with("delay.msg"); } @Bean public ConnectionFactory connectionFactory(){ 
     CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setAddresses(host); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate(){ 
     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); return rabbitTemplate; } public String getHost() { 
     return host; } public void setHost(String host) { 
     this.host = host; } public int getPort() { 
     return port; } public void setPort(int port) { 
     this.port = port; } public String getUsername() { 
     return username; } public void setUsername(String username) { 
     this.username = username; } public String getPassword() { 
     return password; } public void setPassword(String password) { 
     this.password = password; } } 

定义一个消息实体

public class Message implements Serializable { 
     private String key; private String value; public String getKey() { 
     return key; } public void setKey(String key) { 
     this.key = key; } public String getValue() { 
     return value; } public void setValue(String value) { 
     this.value = value; } } 

定义一个Consumer来监控业务队列

@Component @RabbitListener(queues = "buss.queue") public class Consumer { 
     @RabbitHandler public void consumerMessage(Message message){ 
     String key = message.getKey(); String value = message.getValue(); System.out.println("延迟队列消费时间" + getCurrentTime()); System.out.println("消费的消息:" + message.getKey() + "---" + message.getValue()); } public static String getCurrentTime(){ 
     Date d = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.format(d); } } 

最后写个Controller测试一下

@RestController public class RabbitDemoTest { 
     @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/test") public void send(){ 
     Message message = new Message(); message.setKey("rabbit"); message.setValue("Hello"); System.out.println("消息发送时间:" + Consumer.getCurrentTime()); rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE, "queue.delay", message, new MessagePostProcessor() { 
     @Override public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException { 
     message.getMessageProperties().setContentEncoding("UTF-8"); message.getMessageProperties().setExpiration("20000"); return message; } }); } } 

启动类

@SpringBootApplication public class RabbitmqDemo { 
     public static void main(String[] args){ 
     SpringApplication.run(RabbitmqDemo.class); } } 

我们访问:http://localhost:8080/test

执行结果如下:

消息发送时间:2020-06-04 22:16:01 延迟队列消费时间2020-06-04 22:16:21 消费的消息:rabbit---Hello 

从执行结果可以看出来,是按指定的时间实现了消息的延迟消费

其实延迟队列的实现方式有很多,像时间轮、radis、Quartz、SchedulerX(阿里)等等,都可以实现延迟队列的功能

这些实现方案都各有千秋,我们在实际的项目中要根据情况来选择合适的实现方案,一切的技术方案都是为了解决业务问题

不要为了技术而技术,脱离业务的技术设计是耍流氓!!!

关注公众号,回复“源码333”可免费下载Demo源码
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/219892.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午9:39
下一篇 2026年3月17日 下午9:39


相关推荐

  • Impala与Hive的比較

    Impala与Hive的比較

    2021年12月2日
    45
  • java VM option

    -Xms256m-Xmx256m-XX:MaxNewSize=256m-XX:MaxPermSize=256m

    2022年4月9日
    58
  • ELK框架搭建[通俗易懂]

    ELK框架搭建[通俗易懂]ELK框架搭建

    2022年4月25日
    47
  • intellij idea激活码【2021.8最新】

    (intellij idea激活码)最近有小伙伴私信我,问我这边有没有免费的intellijIdea的激活码,然后我将全栈君台教程分享给他了。激活成功之后他一直表示感谢,哈哈~https://javaforall.net/100143.htmlIntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,上面是详细链接哦~S32P…

    2022年3月26日
    54
  • 决策树模型的用途_决策树模型怎么建立

    决策树模型的用途_决策树模型怎么建立概念定义在特征空间与类空间上的条件概率分布,即给定特征条件下类的条件概率分布;也可以认为是if-then规则的集合优点模型具有可读性,分类速度快。模型首先,介绍一下决策树模型:由结点和有向边组成,结点又可分为内部结点和叶结点。内部结点表示一个特征或属性,叶结点表示一个类。决策树与条件概率分布决策树所表示的条件概率分布由各个单元给定条件下的类的条件概率分布组成。若X表…

    2022年10月21日
    5
  • 海思Hi3798MV310芯片处理器参数介绍「建议收藏」

    海思Hi3798MV310芯片处理器参数介绍「建议收藏」Hi3798MV310是用于IPTV/OTT机顶盒市场的支持4KP60解码的超高清高性能SOC芯片。集成4核64位高性能CortexA53处理器和多核高性能2D/3D加速引擎;支持H.265/AVS24Kx2K@P6010bit超高清视频解码,高性能的H.265高清视频编码,HDR视频解码及显示,HDR转SDR,BT.2020,Dolby和DTS音频处理;内置USB2.0…

    2022年6月28日
    198

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号