文章目录
一、RabbitMQ延迟队列实现
1.1、RabbitMQ延迟队列实现流程

- 生产者生产一条延迟消息,根据延迟时间的不同,利用不同的routing-key将消息路由到不同的延迟队列,每个队列都设置了不同的 TTL 属性 ( TTL ( Time To Live ) 生存时间 ),并绑定到同一个死信交换机中。
- 消息过期后,根据routing-key的不同,又会被死信交换机路由到不同的死信队列中,消费者只需要监听对应的死信队列进行消费即可。
1.2、配置RabbitMQ连接
#[ RabbitMQ相关配置 ] #rabbitmq服务器IP spring.rabbitmq.host=安装RabbitMQ的服务器IP #rabbitmq服务器端口(默认为5672) spring.rabbitmq.port=5672 #用户名 spring.rabbitmq.username=guest #用户密码 spring.rabbitmq.password=guest #虚拟主机(一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列) #vhost虚拟主机地址( 默认为/ ) spring.rabbitmq.virtual-host=/
1.3、创建配置类,配置两个交换机、四个队列、以及根据路由键配置交换机和队列的绑定关系
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfiguration {
//延迟交换机 public static final String DELAY_EXCHANGE = "delay_exchange"; //延迟队列A public static final String DELAY_QUEUE_A = "delay_queue_a"; //延迟队列B public static final String DELAY_QUEUE_B = "delay_queue_b"; //延迟路由键10S public static final String DELAY_QUEUE_10S_ROUTING_KEY = "delay_queue_10s_routing_key"; //延迟路由键60S public static final String DELAY_QUEUE_60S_ROUTING_KEY = "delay_queue_60s_routing_key"; //死信交换机 public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange"; //死信队列A public static final String DEAD_LETTER_QUEUE_A = "dead_letter_queue_a"; //死信队列B public static final String DEAD_LETTER_QUEUE_B = "dead_letter_queue_b"; //死信路由键10S public static final String DEAD_LETTER_QUEUE_10S_ROUTING_KEY = "dead_letter_queue_10s_routing_key"; //死信路由键60S public static final String DEAD_LETTER_QUEUE_60S_ROUTING_KEY = "dead_letter_queue_60s_routing_key"; //延迟交换机 @Bean("delayExchange") public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE, true, false); } //延迟队列A @Bean("delayQueueA") public Queue delayQueueA(){
Map<String, Object> args = new HashMap<>(); //设置延迟队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); //设置延迟队列绑定的死信路由键 args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_10S_ROUTING_KEY); //设置延迟队列的 TTL 消息存活时间 args.put("x-message-ttl", 10*1000); return new Queue(DELAY_QUEUE_A, true, false, false, args); } //延迟队列B @Bean("delayQueueB") public Queue delayQueueB(){
Map<String, Object> args = new HashMap<>(); //设置延迟队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); //设置延迟队列绑定的死信路由键 args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_60S_ROUTING_KEY); //设置延迟队列的 TTL 消息存活时间 args.put("x-message-ttl", 60*1000); return new Queue(DELAY_QUEUE_B, true, false, false, args); } //延迟队列A的绑定关系 @Bean("delayBindingA") public Binding delayBindingA(@Qualifier("delayQueueA")Queue queue, @Qualifier("delayExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_10S_ROUTING_KEY); } //延迟队列B的绑定关系 @Bean("delayBindingB") public Binding delayBindingB(@Qualifier("delayQueueB")Queue queue, @Qualifier("delayExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_60S_ROUTING_KEY); } //死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false); } //死信队列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUE_A, true); } //死信队列B @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUE_B, true); } //死信队列A的绑定关系 @Bean("deadLetterBindingA") public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue, @Qualifier("deadLetterExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_10S_ROUTING_KEY); } //死信队列B的绑定关系 @Bean("deadLetterBindingB") public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue, @Qualifier("deadLetterExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_60S_ROUTING_KEY); } }
1.4、创建一个枚举类来配置延迟类型
@Getter @AllArgsConstructor public enum DelayTypeEnum {
//10s DELAY_10s(1), //60s DELAY_60s(2); private Integer type; / * 延迟类型 * @param type * @return 延迟类型 */ public static DelayTypeEnum getDelayTypeEnum(Integer type){
if(Objects.equals(type, DELAY_10s.type)){
return DELAY_10s; } if(Objects.equals(type, DELAY_60s.type)){
return DELAY_60s; } return null; } }
1.5、创建生产者类发送消息
import com.cd.springbootrabbitmq.enums.DelayTypeEnum; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_EXCHANGE; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_10S_ROUTING_KEY; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_60S_ROUTING_KEY; / * 延迟消息生产者 */ @Component public class DelayMessageProducer {
@Autowired private RabbitTemplate rabbitTemplate; / * 发送延迟消息 * @param message 要发送的消息 * @param type 延迟类型(延时10s的延迟队列 或 延时60s的延迟队列) */ public void sendDelayMessage(String message, DelayTypeEnum type){
switch (type){
case DELAY_10s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_10S_ROUTING_KEY, message); break; case DELAY_60s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_60S_ROUTING_KEY, message); break; default: break; } } }
1.6、创建消费者类消费消息
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_A; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_B; @Slf4j @Component public class DeadLetterQueueConsumer {
@Autowired private RabbitTemplate rabbitTemplate; / * 监听死信队列A * @param message 接收的信息 */ //@RabbitListener(queues = "dead_letter_queue_a") @RabbitListener(queues = DEAD_LETTER_QUEUE_A) public void receiveA(Message message) {
String msg = new String(message.getBody()); // 记录日志 log.info("当前时间:{},死信队列A收到的消息:{}", LocalDateTime.now(), msg); } / * 监听死信队列B * @param message 接收的信息 */ //@RabbitListener(queues = "dead_letter_queue_b") @RabbitListener(queues = DEAD_LETTER_QUEUE_B) public void receiveB(Message message){
String msg = new String(message.getBody()); // 记录日志 log.info("当前时间:{},死信队列B收到的消息:{}", LocalDateTime.now(), msg); } }
1.7、创建控制类
import com.cd.springbootrabbitmq.enums.DelayTypeEnum; import com.cd.springbootrabbitmq.producer.DelayMessageProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.util.Objects; @Slf4j @RestController @RequestMapping("/rabbitmq") public class RabbitMQController {
@Autowired private DelayMessageProducer producer; @RequestMapping("/send") public void send(String message, Integer delayType){
// 记录日志 log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, delayType); // 发送延迟消息 producer.sendDelayMessage(message, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnum(delayType))); } }
1.8、测试
在浏览器中先后提交下面两个请求:
1)localhost:8080/rabbitmq/send?message=测试自定义延迟处理60s&delayType=2
2)localhost:8080/rabbitmq/send?message=测试自定义延迟处理10s&delayType=1
查看idea控制台:

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/229224.html原文链接:https://javaforall.net
