springboot rabbitmq 之死信队列(延迟消费消息)「建议收藏」

springboot rabbitmq 之死信队列(延迟消费消息)

大家好,又见面了,我是全栈君。

之前探讨了springboot 集成 rabbitmq  以及开启ack模式   

传送门:https://my.oschina.net/u/2948566/blog/1624963

接着该篇 搞一下 死信队列

  • 概念

死信队列 听上去像 消息“死”了     其实也有点这个意思,死信队列  是 当消息在一个队列 因为下列原因:

  1. 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
  2. 消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration()) 
  3. 队列超载

变成了 “死信” 后    被重新投递(publish)到另一个Exchange   该Exchange 就是DLX     然后该Exchange 根据绑定规则 转发到对应的 队列上  监听该队列  就可以重新消费     说白了 就是  没有被消费的消息  换个地方重新被消费

              生产者   –>  消息 –> 交换机  –> 队列  –> 变成死信  –> DLX交换机 –>队列 –> 消费者

  • springboot rabbitmq 死信队列实践

下面我们模拟一个死信队列的应用场景   消息延时处理

还是以这个项目为基础: https://gitee.com/felord/springboot-message

 项目中 RabbitConfig  死信相关片段:

   /**
     * 死信队列跟交换机类型没有关系 不一定为directExchange  不影响该类型交换机的特性.
     *
     * @return the exchange
     */
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }

    /**
     * 声明一个死信队列.
     * x-dead-letter-exchange   对应  死信交换机
     * x-dead-letter-routing-key  对应 死信队列
     *
     * @return the queue
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    声明  死信交换机
        args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE");
//       x-dead-letter-routing-key    声明 死信路由键
        args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R");
        return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
    }

    /**
     * 定义死信队列转发队列.
     *
     * @return the queue
     */
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable("REDIRECT_QUEUE").build();
    }

    /**
     * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
     *
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);

    }

    /**
     * 死信路由通过 KEY_R 绑定键绑定到死信队列上.
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
    }

说明: 

deadLetterExchange()声明了一个Direct 类型的Exchange (死信队列跟交换机没有关系

deadLetterQueue() 声明了一个队列   这个队列 跟前面我们声明的队列不一样    注入了 Map<String,Object> 参数    下面的概念非常重要   

x-dead-letter-exchange 来标识一个交换机  x-dead-letter-routing-key  来标识一个绑定键(RoutingKey)  这个绑定键 是分配给 标识的交换机的   如果没有特殊指定 声明队列的原routingkey , 如果有队列通过此绑定键 绑定到交换机    那么死信会被该交换机转发到 该队列上  通过监听 可对消息进行消费  

可以打个比方  这个是为主力队员 设置了一个替补   如果主力 “死”了   他的活 替补接手  这样更好理解

deadLetterBinding() 对这个带参队列 进行了 和交换机的规则绑定   等下 消费者 先把消息通过交换机投递到该队列中去   然后制造条件发生“死信” 

redirectBinding() 我们需要给标识的交换机  以及对其指定的routingkey 来绑定一个所谓的“替补”队列 用来监听

 

流程具体是  消息投递到  DL_QUEUE  10秒后消息过期 生成死信    然后转发到 REDIRECT_QUEUE  通过对其的监听  来消费消息     

SendController 增加消费发送接口 

    /**
     * 测试死信队列.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/dead")
    public ResponseEntity deadLetter(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//        声明消息处理器  这个对消息进行处理  可以设置一些参数   对消息进行一些定制化处理   我们这里  来设置消息的编码  以及消息的过期时间  因为在.net 以及其他版本过期时间不一致   这里的时间毫秒值 为字符串
        MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
//            设置编码
            messageProperties.setContentEncoding("utf-8");
//            设置过期时间10*1000毫秒
            messageProperties.setExpiration("10000");
            return message;
        };
//         向DL_QUEUE 发送消息  10*1000毫秒后过期 形成死信
        rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", p, messagePostProcessor, correlationData);
        return ResponseEntity.ok();
    }

 

监听 REDIRECT_QUEUE 

    /**
     * 监听替补队列 来验证死信.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  这里异常需要处理
     */
    @RabbitListener(queues = {"REDIRECT_QUEUE"})
    public void redirect(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.debug("dead message  10s 后 消费消息 {}",new String (message.getBody()));
    }

 

测试死信队列接口

springboot rabbitmq 之死信队列(延迟消费消息)「建议收藏」

不出意外  消息会在发出10秒后 才被消费     一下信息证实了这一猜测

springboot rabbitmq 之死信队列(延迟消费消息)「建议收藏」

 

相关源码:https://gitee.com/felord/springboot-message      希望大家点个赞

转载于:https://my.oschina.net/10000000000/blog/1626278

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/107694.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 智能视频识别技术的发展现状「建议收藏」

    智能视频识别技术的发展现状「建议收藏」一、智能视频分析技术应用现状  作为强化视频监控系统应用的一门主要技术——视频智能分析技术,近几年一直得到业界的广泛的关注,其通过对视频内容的分析,将客户所关注的目标从监控背景中分离出来,按照目标的移动方向、速度、时间等参数和某些行为特征进行关联,从而达到主动监控防御的目的。按说这一技术的大规模推广应用对于提高当前治安监控系统的利用效率将起到很大的作用,但实际上却没有得到有效的推广,所谓“叫好不叫座”。笔者认为,造成这一现象的主要原因有以下几个方面:【您可以是大型系统集成商、可以是相关贸易的经销商.

    2022年4月30日
    77
  • Pycharm提示 Unresolved reference 的解决办法[通俗易懂]

    Pycharm提示 Unresolved reference 的解决办法[通俗易懂]有时候a.py和b.py在一个目录里面,但是在a.py种写importb有时会提示Unresolvedreference,Pycharm常见,解决办法是setting->Project->Projectstructure->Source,点击要加入的文件夹.注意:添加成功之后该文件见的图标会编程蓝色(加入之前是淡蓝色中间一个圆圈)如上图所示。…

    2022年8月27日
    4
  • ScaleAnimation开始结束位置分析[通俗易懂]

    ScaleAnimation开始结束位置分析[通俗易懂]做项目的时候,需要用到动画,大小和位置都不一样。刚开始想到的是ScaleAnimation和TranslateAnimation进行组合,但实验后发现,目标位置始终不对,只用TranslateAnimation是没有问题,所以ScaleAnimation应该不只是进行了缩放经过查找资料,发现ScaleAnimation还进行起始位置的移动。ScaleAnimation分为两种情况,从本身的位置…

    2022年10月15日
    1
  • db2中You can’t specify target table for update in FROM clause错误

    db2中You can’t specify target table for update in FROM clause错误db2中You can’t specify target table for update in FROM clause错误

    2022年4月23日
    66
  • 解决springboot 2.0集成elasticsearch 7.6.2 查询总数为10000

    解决springboot 2.0集成elasticsearch 7.6.2 查询总数为10000小伙伴们,你们好,我是老寇据查询相关资料,在elasticsearch7.x以后的版本,当查询的结果总数大于1万时,默认total返回总数为10000在kibana获取真实总数,只需要加添加track_total_hits参数{“query”:{“match_all”:{}},”track_total_hits”:true}在springboot项目中,增加配置//获取真实总数searchSourceBuilder.trackTotalHit

    2022年5月25日
    42

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号