什么是延迟队列?
我们先来看一个场景:以淘宝购物为例,当你提交订单之后有30分钟的支付时间,假如你30分钟之后还没有进行支付,订单就会被取消。现在让你来实现这个功能,你准备如何实现?
相信很多小伙伴第一反应就是定时轮询,设定一个定时任务去扫订单数据,一旦发现超过30分钟未支付的订单,就将订单状态update成已取消,这是一种最简单的方法,也是最容易实现的。这种方案的弊端在于:当数据量小时,不会存在问题,当数据量越来越大时,定时扫表会变得越来越慢,而且频繁的扫表会影响下单的效率
延迟队列就是用来解决这一类问题的,那么什么是延迟队列呢?
延迟队列是为了解决任务推迟执行的问题,消息进入延迟队列之后暂时不能被消费,等超过了设定的时间才能被消费者进行消费
可以想像一下这样一种场景,每个任务进入队列的时候都打上了一个时间标签,任务1(10分钟后执行)、任务2(30分钟后执行)、任务3(60分钟后执行),当到了标签对应的时间之后,任务才能被执行
常见的可以使用延迟队列场景:
- 淘宝下单后,30分钟未支付要取消订单
- 外卖订单1分钟后,短信提醒客户
- 3天未评论自动好评
总之,需要延后执行的任务都可以用延迟队列来解决
延迟队列的实现方法
1、DelayQueue
在JDK的java.util.concurrent包中提供了延迟队列的实现DelayQueue,它提供了在指定的时间才能获取队列中元素的功能,队列头的元素是最接近过期时间的元素。如果没有过期元素,使用poll()方法会返回null。下面看代码实现
public class DelayTask implements Delayed {
private String msg; private long executeTime; public String getMsg() {
return msg; } public void setMsg(String msg) {
this.msg = msg; } public long getExecuteTime() {
return executeTime; } public void setExecuteTime(long executeTime) {
this.executeTime = executeTime; } public DelayTask(long delayTime, String msg){
//到期时间 = 当前时间 + 延迟时间 this.executeTime = System.currentTimeMillis() + delayTime; this.msg = msg; } @Override public long getDelay(TimeUnit unit) {
return unit.convert(this.executeTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) {
return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } }
DelayQueue中的元素需要实现Delayed接口,我们定义一个任务来实现Delayed接口,主要有两个方法
- getDelay(TimeUnit unit) 定义了剩余到期时间
- compareTo(Delayed o) 定义了元素的排序规则
public class DelayQueueTest {
public static void main(String[] args){
//创建一个延迟队列 DelayQueue<DelayTask> delayQueue = new DelayQueue<DelayTask>(); DelayQueueTest delayQueueTest = new DelayQueueTest(); //生产者放入消息 delayQueueTest.producer(delayQueue); //消费者消费消息 delayQueueTest.consumer(delayQueue); } //定义一个消费者,启动一个线程,循环从队列中拿元素 private void consumer(DelayQueue<DelayTask> delayTasks){
new Thread(new Runnable() {
@Override public void run() {
while(true){
try {
DelayTask delayTask = delayTasks.take(); System.out.println("消息消费时间:" + getCurrentTime() + ",msg:" + delayTask.getMsg()); } catch (InterruptedException e) {
e.printStackTrace(); } } } }).start(); } //生产者,将消息带上延迟时间,放入延迟队列 private void producer(DelayQueue<DelayTask> delayTasks){
DelayTask delayTask = new DelayTask(5000,"delay msg"); System.out.println("消息放入时间:" + getCurrentTime()); delayTasks.add(delayTask); } public static String getCurrentTime(){
Date d = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.format(d); } }
如上的代码
- 生产者放入一个任务,延迟5秒执行
- 消费者不停的轮询队列,从中拿任务来执行
从输出结果可以看出,任务过了5秒才被消费者拿到,实现了任务的延迟执行
消息放入时间:2020-06-04 00:06:17 消息消费时间:2020-06-04 00:06:22,msg:delay msg
rabbitMQ死信队列
- 生产者发送消息给死信Exchange,通过routing-key消息发送到指定的死信队列,此时死信队列是没有消费者的
- 死信队列中的消息到期后会自动转发到业务Exchange中,通过routing-key消息发送到指定的业务队列中
- 业务处理的Consumer监控业务队列,取到转发过来的消息进行消费,从而达到延迟队列的效果
下面来看一下代码实现:
首先是rabbitmq的配置
spring: rabbitmq: host: localhost port: 5672 username: admin password: admin
rabbitmq的Configuration
@Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitMQConfig {
private String host; private int port; private String username; private String password; public static String DELAY_EXCHANGE = "delay.exchange";//死信Exchange public static String BUSS_EXCHANGE = "buss.exchange";//业务Exchange public static String DELAY_QUEUE = "delay.queue";//死信队列 public static String BUSS_QUEUE = "buss.queue";//业务队列 / * 定义一个死信队列 * @return */ @Bean public Queue delayQueue(){
Map<String,Object> args = new HashMap<String,Object>(); //消息过期后转发的exchange args.put("x-dead-letter-exchange",BUSS_EXCHANGE); //消息过期后转发的routing-key args.put("x-dead-letter-routing-key","delay.msg"); //队列中消息的过期时间(注意消息上也可以设置过期时间),两者若同时设置取其小 args.put("x-message-ttl",20000); return QueueBuilder.durable(DELAY_QUEUE).withArguments(args).build(); } / * 定义普通的业务队列 * @return */ @Bean public Queue bussQueue(){
return new Queue(BUSS_QUEUE,true); } / * 死信Exchange * @return */ @Bean public TopicExchange delayTopicExchange(){
return new TopicExchange(DELAY_EXCHANGE); } / * 业务Exchange * @return */ @Bean public TopicExchange bussTopicExchange(){
return new TopicExchange(BUSS_EXCHANGE); } / * 绑定死信队列与死信Exchange,设置routing-key为queue.delay * @return */ @Bean public Binding bindingDelayExchangeMessage(){
return BindingBuilder.bind(delayQueue()).to(delayTopicExchange()).with("queue.delay"); } / * 绑定业务队列与业务Exchange,设置routing-key为delay.msg * 注意:此处的rounting-key与死信队列的x-dead-letter-routing-key保持一致,才能保证死信消息过期后可以转发到此队列中 * @return */ @Bean public Binding bindingDelayMessage(){
return BindingBuilder.bind(bussQueue()).to(bussTopicExchange()).with("delay.msg"); } @Bean public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setAddresses(host); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); return rabbitTemplate; } public String getHost() {
return host; } public void setHost(String host) {
this.host = host; } public int getPort() {
return port; } public void setPort(int port) {
this.port = port; } public String getUsername() {
return username; } public void setUsername(String username) {
this.username = username; } public String getPassword() {
return password; } public void setPassword(String password) {
this.password = password; } }
定义一个消息实体
public class Message implements Serializable {
private String key; private String value; public String getKey() {
return key; } public void setKey(String key) {
this.key = key; } public String getValue() {
return value; } public void setValue(String value) {
this.value = value; } }
定义一个Consumer来监控业务队列
@Component @RabbitListener(queues = "buss.queue") public class Consumer {
@RabbitHandler public void consumerMessage(Message message){
String key = message.getKey(); String value = message.getValue(); System.out.println("延迟队列消费时间" + getCurrentTime()); System.out.println("消费的消息:" + message.getKey() + "---" + message.getValue()); } public static String getCurrentTime(){
Date d = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.format(d); } }
最后写个Controller测试一下
@RestController public class RabbitDemoTest {
@Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/test") public void send(){
Message message = new Message(); message.setKey("rabbit"); message.setValue("Hello"); System.out.println("消息发送时间:" + Consumer.getCurrentTime()); rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE, "queue.delay", message, new MessagePostProcessor() {
@Override public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
message.getMessageProperties().setContentEncoding("UTF-8"); message.getMessageProperties().setExpiration("20000"); return message; } }); } }
启动类
@SpringBootApplication public class RabbitmqDemo {
public static void main(String[] args){
SpringApplication.run(RabbitmqDemo.class); } }
我们访问:http://localhost:8080/test
执行结果如下:
消息发送时间:2020-06-04 22:16:01 延迟队列消费时间2020-06-04 22:16:21 消费的消息:rabbit---Hello
从执行结果可以看出来,是按指定的时间实现了消息的延迟消费
其实延迟队列的实现方式有很多,像时间轮、radis、Quartz、SchedulerX(阿里)等等,都可以实现延迟队列的功能
这些实现方案都各有千秋,我们在实际的项目中要根据情况来选择合适的实现方案,一切的技术方案都是为了解决业务问题
不要为了技术而技术,脱离业务的技术设计是耍流氓!!!
关注公众号,回复“源码333”可免费下载Demo源码

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/219892.html原文链接:https://javaforall.net
