RabbitMQ消息监听异常问题探究「建议收藏」

RabbitMQ消息监听异常问题探究「建议收藏」问题场景在使用SpringRabbitMQ做消息监听时,如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常。为了更好的描述问题,下面写个简单的例子。通过访问null对象来引发空指针异常,消息监听处理程序代码清单:packageamqp;importorg.springframework.amqp.core.Message;importorg.springfram

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

问题场景

在使用Spring RabbitMQ做消息监听时,如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常。为了更好的描述问题,下面写个简单的例子。

通过访问null对象来引发空指针异常,消息监听处理程序代码清单:

package amqp;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;


@Component
public class FooMessageListener implements MessageListener { 
   

    @Override
    public void onMessage(Message message) { 
   
        String messageBody = new String(message.getBody());
        System.out.println(" [x] Received '" + messageBody + "'");
        String nullStr = null;
        nullStr.toString();
    }
}

往消息监听队列发送一条消息,控制台不停打印异常日志:

18:55:32.816 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:32.979 [pool-1-thread-10] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:32.979 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])
 [x] Received 'Hello, world!'
18:55:38.191 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
	at amqp.FooMessageListener.onMessage(FooMessageListener.java:16)
	at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:282)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
	... 10 common frames omitted
18:55:38.192 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Rejecting messages (requeue=true)
18:55:38.193 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:38.194 [pool-1-thread-3] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:38.195 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])
 [x] Received 'Hello, world!'
18:55:55.226 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
	at amqp.FooMessageListener.onMessage(FooMessageListener.java:16)
	at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:282)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
	... 10 common frames omitted
18:55:55.226 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Rejecting messages (requeue=true)
18:55:55.227 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:55.229 [pool-1-thread-4] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:55.230 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])

问题来了:为什么异常时会一直重复接收这条消息?

抓包验证

消息监听程序异常的过程到底发生了什么?为了一探究竟,笔者使用Wireshark抓包工具来查看消息处理过程。

首先,修改下监听程序,收到字符串exception时产生异常

 @Override
 public void onMessage(Message message) { 
   
     String messageBody = new String(message.getBody());
     System.out.println(" [x] Received '" + messageBody + "'");
     if(messageBody.equals("exception")){ 
   
         String nullStr = null;
         nullStr.toString();
     }
 }

1.监听程序正常处理情况

启动监听程序,Wireshark抓包情况:

启动监听程序

我们主要关注最后一列,这一列展示了请求的AMQP协议方法信息,AMQP协议方法包含类名+方法名+参数,这一列主要展示了类名和方法名,点击对应行可以查看参数信息。比如上图:

  • Connection.Start:请求服务端开始建立连接
  • Channel.Open:请求服务端建立信道
  • Queue.Declare:声明队列
  • Basic.Consume:开始一个消费者,请求指定队列的消息

AMQP协议方法更详细介绍可以查看官网

然后,通过客户端发送一条消息

RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
template.convertAndSend("ok");

抓包:

ack

分析:

  • Basic.Publish: 客户端发送Basic.Publish方法请求,将消息发布到exchange,rabbitmq server会根据路由规则转发到队列中
  • Basic.Deliver: 服务端发送Basic.Deliver方法请求,投递消息到监听队列的客户端消费者
  • Basic.Ack: 客户端发送Basic.Ack方法请求,告知rabbimq server,消息已接收处理

2.监听程序异常处理情况

通过客户端发送exception字符串,制造异常

RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
template.convertAndSend("exception");

抓包:

reject

分析:

  • Basic.Reject: 客户端发送Basic.Reject方法请求,表示无法处理消息,拒绝消息,此时的requeue参数为true,将消息返回原来的队列
  • Basic.Deliver: 服务端调用Basic.Deliver方法,和第一次Basic.Deliver方法不同的是,此时的redeliver参数为true,表示重新投递消息到监听队列的消费者

然后这两步会一直重复下去。对于Basic.Reject方法,可以设置requeue参数为false,这样消息无法处理的时候就不会重新入队了,他会根据异常类型选择直接丢弃或加入dead-letter-exchange中。Spring RabbitMQ配置:

<!--配置监听-->
<rabbit:listener-container connection-factory="connectionFactory" requeue-rejected="false">
    <rabbit:listener ref="fooMessageListener" queue-names="directQueue" />
</rabbit:listener-container>

结论

RabbitMQ消息监听程序异常时,消费者会向rabbitmq server发送Basic.Reject,表示消息拒绝接受,由于Spring默认requeue-rejected配置为true,消息会重新入队,然后rabbitmq server重新投递,造成了程序一直异常的情况。所以说了这么多,我们通过rabbitmq监听消息的时候,程序一定要添加try…catch语句!!!当然你也可以根据实际情况,选择设置requeue-rejected为false来丢弃消息。

参考

[1] AMQP协议方法
[2] Wireshark抓包教程
[3] http://blog.csdn.net/u013256816/article/details/55515234
[4] http://fengchj.com/?p=2234
[5] https://yemengying.com/2017/01/30/how-does-rabbitmq-handle-exception/

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/170742.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Android常用控件

    Android常用控件TextView显示文本<TextViewandroid:id="@+id/text_view"android:layout_width="match_pa

    2022年7月2日
    20
  • Java自定义注解Annotation详解[通俗易懂]

    简介开发中经常使用到注解,在项目中也偶尔会见到过自定义注解,今天就来探讨一下这个注解是什么鬼,以及注解的作用和自定义注解。列举开发中常见的注解@Override:当重写父类的方法时一般都会在方法上标注上此注解(我们最经常看到的toString()方法上总能看到这货)@Deprecated:用于标记某个方法已经过期,请使用新的方法来替代已经废弃的方法@SuppressWarnings:让编译器或

    2022年4月13日
    64
  • 河北对口计算机录取分数线_河北对口计算机专科院校名单

    河北对口计算机录取分数线_河北对口计算机专科院校名单技校网专门为您推荐的类似问题答案问题1:2009年河北对口计算机高考分数线360问题2:谁能告诉我湖南职高计算机专业对口升学本科和专科分数线湖南省2010年普通高校职高对口招生录取控制分数线代码专业门类本科专科71师范44820072种植54473养殖52374机电56675电子电工50676计算机55877建筑48278旅游48981…

    2022年9月13日
    0
  • Xray的快速使用

    Xray的快速使用快速使用使用基础爬虫爬取并扫描整个网站xraywebscan–basic-crawlerhttp://example.com–html-outputcrawler.html使用HTTP代理进行被动扫描xraywebscan–listen127.0.0.1:7777–html-outputproxy.html设置浏览器http代理为http://127.0.0.1:7777,就可以自动分析代理流量并扫描。如需扫描https流量,请阅读下方文档抓取htt

    2022年5月30日
    66
  • MyBatis+SpringBoot整合 注入SqlSessionTemplate

    MyBatis+SpringBoot整合 注入SqlSessionTemplate实际开发中我们操作数据库持久化,总是需要写重复的mapper,service,xml浪费了我们大量的时间,在这里推荐大家使用SqlSessionTemplate废话不多说直接上代码工具类接口层:packagecom.miaosuan.dao;importjava.util.List;importcom.miaosuan.dao.dbenums.NameSpaceEnum;…

    2022年5月6日
    54
  • jupyter notebook的链接密码 token查询 以及 pycharm 如何使用 jupyter notebook「建议收藏」

    jupyter notebook的链接密码 token查询 以及 pycharm 如何使用 jupyter notebook「建议收藏」目录1、token的查询:2、如何在pycharm中使用jupyternotebook学Python时突然想用jupyternotebook来运行一下代码,好做一下笔记,结果发现要jupyternotebook的token密码,这可苦了我,我怎么可能会记得呢。。。于是上百度搜索一番,有不错的收获,现整理一下:1、token的查询:结合网上查找的和我自己的体会,发现了3种…

    2025年6月18日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号