RabbitMQ消息监听异常问题探究「建议收藏」

RabbitMQ消息监听异常问题探究「建议收藏」问题场景在使用SpringRabbitMQ做消息监听时,如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常。为了更好的描述问题,下面写个简单的例子。通过访问null对象来引发空指针异常,消息监听处理程序代码清单:packageamqp;importorg.springframework.amqp.core.Message;importorg.springfram

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

问题场景

在使用Spring RabbitMQ做消息监听时,如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常。为了更好的描述问题,下面写个简单的例子。

通过访问null对象来引发空指针异常,消息监听处理程序代码清单:

package amqp;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;


@Component
public class FooMessageListener implements MessageListener { 
   

    @Override
    public void onMessage(Message message) { 
   
        String messageBody = new String(message.getBody());
        System.out.println(" [x] Received '" + messageBody + "'");
        String nullStr = null;
        nullStr.toString();
    }
}

往消息监听队列发送一条消息,控制台不停打印异常日志:

18:55:32.816 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:32.979 [pool-1-thread-10] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:32.979 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])
 [x] Received 'Hello, world!'
18:55:38.191 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
	at amqp.FooMessageListener.onMessage(FooMessageListener.java:16)
	at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:282)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
	... 10 common frames omitted
18:55:38.192 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Rejecting messages (requeue=true)
18:55:38.193 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:38.194 [pool-1-thread-3] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:38.195 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])
 [x] Received 'Hello, world!'
18:55:55.226 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
	at amqp.FooMessageListener.onMessage(FooMessageListener.java:16)
	at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:282)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
	... 10 common frames omitted
18:55:55.226 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Rejecting messages (requeue=true)
18:55:55.227 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:55.229 [pool-1-thread-4] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:55.230 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])

问题来了:为什么异常时会一直重复接收这条消息?

抓包验证

消息监听程序异常的过程到底发生了什么?为了一探究竟,笔者使用Wireshark抓包工具来查看消息处理过程。

首先,修改下监听程序,收到字符串exception时产生异常

 @Override
 public void onMessage(Message message) { 
   
     String messageBody = new String(message.getBody());
     System.out.println(" [x] Received '" + messageBody + "'");
     if(messageBody.equals("exception")){ 
   
         String nullStr = null;
         nullStr.toString();
     }
 }

1.监听程序正常处理情况

启动监听程序,Wireshark抓包情况:

启动监听程序

我们主要关注最后一列,这一列展示了请求的AMQP协议方法信息,AMQP协议方法包含类名+方法名+参数,这一列主要展示了类名和方法名,点击对应行可以查看参数信息。比如上图:

  • Connection.Start:请求服务端开始建立连接
  • Channel.Open:请求服务端建立信道
  • Queue.Declare:声明队列
  • Basic.Consume:开始一个消费者,请求指定队列的消息

AMQP协议方法更详细介绍可以查看官网

然后,通过客户端发送一条消息

RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
template.convertAndSend("ok");

抓包:

ack

分析:

  • Basic.Publish: 客户端发送Basic.Publish方法请求,将消息发布到exchange,rabbitmq server会根据路由规则转发到队列中
  • Basic.Deliver: 服务端发送Basic.Deliver方法请求,投递消息到监听队列的客户端消费者
  • Basic.Ack: 客户端发送Basic.Ack方法请求,告知rabbimq server,消息已接收处理

2.监听程序异常处理情况

通过客户端发送exception字符串,制造异常

RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
template.convertAndSend("exception");

抓包:

reject

分析:

  • Basic.Reject: 客户端发送Basic.Reject方法请求,表示无法处理消息,拒绝消息,此时的requeue参数为true,将消息返回原来的队列
  • Basic.Deliver: 服务端调用Basic.Deliver方法,和第一次Basic.Deliver方法不同的是,此时的redeliver参数为true,表示重新投递消息到监听队列的消费者

然后这两步会一直重复下去。对于Basic.Reject方法,可以设置requeue参数为false,这样消息无法处理的时候就不会重新入队了,他会根据异常类型选择直接丢弃或加入dead-letter-exchange中。Spring RabbitMQ配置:

<!--配置监听-->
<rabbit:listener-container connection-factory="connectionFactory" requeue-rejected="false">
    <rabbit:listener ref="fooMessageListener" queue-names="directQueue" />
</rabbit:listener-container>

结论

RabbitMQ消息监听程序异常时,消费者会向rabbitmq server发送Basic.Reject,表示消息拒绝接受,由于Spring默认requeue-rejected配置为true,消息会重新入队,然后rabbitmq server重新投递,造成了程序一直异常的情况。所以说了这么多,我们通过rabbitmq监听消息的时候,程序一定要添加try…catch语句!!!当然你也可以根据实际情况,选择设置requeue-rejected为false来丢弃消息。

参考

[1] AMQP协议方法
[2] Wireshark抓包教程
[3] http://blog.csdn.net/u013256816/article/details/55515234
[4] http://fengchj.com/?p=2234
[5] https://yemengying.com/2017/01/30/how-does-rabbitmq-handle-exception/

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/170742.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 到底什么作用

    到底什么作用

    2021年9月22日
    39
  • 关于我转生变成史莱姆这档事_kicker.de

    关于我转生变成史莱姆这档事_kicker.de1  请问什么所kworker进程 清理旧版本的软件缓存:  sudoapt-getautoclean这个进程是干什么的? 我的机器刚装11.04没次卡的时候top一下就发现kworker这个进程占用CPU很大,基本上都能到75%左右youmaytrytodisableallpowersavingcon

    2022年9月25日
    3
  • spi总线协议及spi时序图详解_奔创spi

    spi总线协议及spi时序图详解_奔创spi大家好,我是无际。上个章节我们讲解了spi接口定义,今天我们更加深入讲解下spi协议时序图和spi四种模式的用法。刚开始接触单片机开发时,最怕就是看时序图,对于我来说就是奇怪的知识。特别是SPI和IIC的,以前写程序都直接复制别人程序,功能实现就行了也没去研究过数据传输的时候时序具体是怎么样的。那个时候经验也不足,网上搜的资料说的都太学术化了,也看不懂。后面项目做多了,发现最常用到的通信总线无非就是SPI、IIC、USART、CAN、单口通信。理解也慢慢深刻了,现在去分析时序图也更加

    2022年10月9日
    3
  • javaee框架整合开发入门到实战源码_java底层框架

    javaee框架整合开发入门到实战源码_java底层框架kunJkunJ框架,是基于HK2框架的一个自实现注入框架,功能比较简单,重在探索依赖注入的实现原理。实现细节1.自定义3个注解,Access,Inject,Service2.在Servi

    2022年8月3日
    6
  • linux nginx配置代理_nginx四层负载均衡

    linux nginx配置代理_nginx四层负载均衡3.2下载正向代理模块(这个模块可能不适合其他版本nginx)如果下载失败可用百度云:链接:提取码:下载解压nginxPS:编译,除正向代理模块外,其他看自己需求安装PS:3.4nginx.conf配置正向代理3.5启动nginx检查配置文件启动、停止、重载命令查看端口四、验证4.1202上面配置正向代理4.2对比202和203分别访问https和http##########################################

    2022年10月8日
    2
  • linux 命令行 查找文件_shell查找文件中指定的字符串

    linux 命令行 查找文件_shell查找文件中指定的字符串一grep:查看文件内容,在文件中查询一个关键字,即搜索字符串的命令(在指定的文件中搜索符合条件的字符串)grep是包含匹配,不是完全的精确匹配,特别适合查找内容语法:grep[-option]需要搜索的关键字文件名参数:-n—-连行号一起显示-c—-统计有几行-i—-忽略大小写(一般用的少)-v—-排除指定的字符串(了解),取反,查找出来的内容是搜索条件以外的所有的内容例如:[root@localhostTEST~]#grep-n

    2025年8月25日
    4

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号