springboot rabbitmq 之死信队列(延迟消费消息)「建议收藏」

springboot rabbitmq 之死信队列(延迟消费消息)

大家好,又见面了,我是全栈君。

之前探讨了springboot 集成 rabbitmq  以及开启ack模式   

传送门:https://my.oschina.net/u/2948566/blog/1624963

接着该篇 搞一下 死信队列

  • 概念

死信队列 听上去像 消息“死”了     其实也有点这个意思,死信队列  是 当消息在一个队列 因为下列原因:

  1. 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
  2. 消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration()) 
  3. 队列超载

变成了 “死信” 后    被重新投递(publish)到另一个Exchange   该Exchange 就是DLX     然后该Exchange 根据绑定规则 转发到对应的 队列上  监听该队列  就可以重新消费     说白了 就是  没有被消费的消息  换个地方重新被消费

              生产者   –>  消息 –> 交换机  –> 队列  –> 变成死信  –> DLX交换机 –>队列 –> 消费者

  • springboot rabbitmq 死信队列实践

下面我们模拟一个死信队列的应用场景   消息延时处理

还是以这个项目为基础: https://gitee.com/felord/springboot-message

 项目中 RabbitConfig  死信相关片段:

   /**
     * 死信队列跟交换机类型没有关系 不一定为directExchange  不影响该类型交换机的特性.
     *
     * @return the exchange
     */
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }

    /**
     * 声明一个死信队列.
     * x-dead-letter-exchange   对应  死信交换机
     * x-dead-letter-routing-key  对应 死信队列
     *
     * @return the queue
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    声明  死信交换机
        args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE");
//       x-dead-letter-routing-key    声明 死信路由键
        args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R");
        return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
    }

    /**
     * 定义死信队列转发队列.
     *
     * @return the queue
     */
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable("REDIRECT_QUEUE").build();
    }

    /**
     * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
     *
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);

    }

    /**
     * 死信路由通过 KEY_R 绑定键绑定到死信队列上.
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
    }

说明: 

deadLetterExchange()声明了一个Direct 类型的Exchange (死信队列跟交换机没有关系

deadLetterQueue() 声明了一个队列   这个队列 跟前面我们声明的队列不一样    注入了 Map<String,Object> 参数    下面的概念非常重要   

x-dead-letter-exchange 来标识一个交换机  x-dead-letter-routing-key  来标识一个绑定键(RoutingKey)  这个绑定键 是分配给 标识的交换机的   如果没有特殊指定 声明队列的原routingkey , 如果有队列通过此绑定键 绑定到交换机    那么死信会被该交换机转发到 该队列上  通过监听 可对消息进行消费  

可以打个比方  这个是为主力队员 设置了一个替补   如果主力 “死”了   他的活 替补接手  这样更好理解

deadLetterBinding() 对这个带参队列 进行了 和交换机的规则绑定   等下 消费者 先把消息通过交换机投递到该队列中去   然后制造条件发生“死信” 

redirectBinding() 我们需要给标识的交换机  以及对其指定的routingkey 来绑定一个所谓的“替补”队列 用来监听

 

流程具体是  消息投递到  DL_QUEUE  10秒后消息过期 生成死信    然后转发到 REDIRECT_QUEUE  通过对其的监听  来消费消息     

SendController 增加消费发送接口 

    /**
     * 测试死信队列.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/dead")
    public ResponseEntity deadLetter(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//        声明消息处理器  这个对消息进行处理  可以设置一些参数   对消息进行一些定制化处理   我们这里  来设置消息的编码  以及消息的过期时间  因为在.net 以及其他版本过期时间不一致   这里的时间毫秒值 为字符串
        MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
//            设置编码
            messageProperties.setContentEncoding("utf-8");
//            设置过期时间10*1000毫秒
            messageProperties.setExpiration("10000");
            return message;
        };
//         向DL_QUEUE 发送消息  10*1000毫秒后过期 形成死信
        rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", p, messagePostProcessor, correlationData);
        return ResponseEntity.ok();
    }

 

监听 REDIRECT_QUEUE 

    /**
     * 监听替补队列 来验证死信.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  这里异常需要处理
     */
    @RabbitListener(queues = {"REDIRECT_QUEUE"})
    public void redirect(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.debug("dead message  10s 后 消费消息 {}",new String (message.getBody()));
    }

 

测试死信队列接口

springboot rabbitmq 之死信队列(延迟消费消息)「建议收藏」

不出意外  消息会在发出10秒后 才被消费     一下信息证实了这一猜测

springboot rabbitmq 之死信队列(延迟消费消息)「建议收藏」

 

相关源码:https://gitee.com/felord/springboot-message      希望大家点个赞

转载于:https://my.oschina.net/10000000000/blog/1626278

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/107694.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 微信表情符号已写入判决书怎么删除_微信100个表情符号含义

    微信表情符号已写入判决书怎么删除_微信100个表情符号含义本文转载自IT之家,IT之家3月3日消息据法治日报报道,如今表情符号已经成为呈堂证供的一部分,登上多地法院判决书等司法文书。报道称,在广东省深圳市中级人民法院发布的一份民事判决书中,对一起涉及微信表情的合同纠纷案作出了判决。作为创作者的卢泓于2018年4月24日向深圳市道一影业集团有限公司法定代表人田民发送其创作的歌词。后者表明其在路上后,发送了微信表情符号[强]。“结合双方的前后聊天内容,一审法院认为该微信表情符号[强]并非是对卢泓歌词的认可,而是属于礼貌性回复,不能

    2022年10月22日
    0
  • pywin32、win32api、win32gui、win32com、win32con 都是啥?「建议收藏」

    pywin32、win32api、win32gui、win32com、win32con 都是啥?「建议收藏」pywin32、win32api、win32gui、win32com、win32con名称非常类似,特别容易混淆,今天就用600字给大家区分一下文章目录pywin32win32guiwin32conwin32apiwin32com记录时间pywin32pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个模块库。该模块的另一个作用是是通过Python进行COM编程。落地场景:如果你想在Windows操作系统用Python实现自动化工作,pywin32模块经常用到

    2022年10月11日
    0
  • poe交换机连接方式_路由器接交换机怎么设置

    poe交换机连接方式_路由器接交换机怎么设置POE也被称为基于局域网的供电系统或有源以太网,有时也被简称为以太网供电,一个完整的POE系统包括供电端设备和受电端设备两部分。可能会有一些朋友对poe供电有一些疑问,这个在之前也有很多朋友问到过,那么,今天就由飞畅科技的小编来用图文为大家详细介绍下poe的几种供电方式和连接方法,感兴趣的朋友就一起来看看吧!poe交换机的4种连接方式一、交换机和终端都支持PoE这种方法PoE交换机直接通过网线接到支持PoE供电的无线AP和网络摄像机上,这种方…

    2022年10月4日
    0
  • 在CentOS7上安装ftp服务器用于保存服务端上传的图片。

    在CentOS7上安装ftp服务器用于保存服务端上传的图片。

    2021年10月19日
    38
  • Nginx 配置 HTTPS 完整过程[通俗易懂]

    Nginx 配置 HTTPS 完整过程[通俗易懂]配置站点使用https,并且将http重定向至https。1.nginx的ssl模块安装查看nginx是否安装http_ssl_module模块。$/usr/local/nginx/sbin/nginx-V![在这里插入图片描述](https://img-blog.csdnimg.cn/2018112610054317.png)如果出现configure…

    2022年7月14日
    13
  • IOS越狱学习总结

    IOS越狱学习总结iOS越狱iOS越狱(iOSJailbreaking),是用于获取苹果公司便携装置操作系统iOS最高权限的一种技术手段,用户使用这种技术及软件可以获取到iOS的最高权限,甚至可能可以进一步解开运营商对手机网络的限制。中文名称iOS越狱外文名称iOSJailbreaking应  用电子产品类  型软件目录1简介2用途3针对的设备

    2022年7月26日
    5

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号