RabbitMQ入门:Hello RabbitMQ 代码实例[通俗易懂]

在之前的一篇博客RabbitMQ入门:认识并安装RabbitMQ(以Windows系统为例)中,我们安装了RabbitMQ并且对其也有的初步的认识,今天就来写个入门小例子来加深概念理解并了解代码怎么实

大家好,又见面了,我是全栈君。

在之前的一篇博客RabbitMQ入门:认识并安装RabbitMQ(以Windows系统为例)中,我们安装了RabbitMQ并且对其也有的初步的认识,今天就来写个入门小例子来加深概念理解并了解代码怎么实现。

本篇博客围绕下面几个方面展开:

  1. 代码前的理论热身
  2. 代码实例:Hello RabbitMQ
  3. 运行代码并调试问题

Now, Let’s begin !

一、代码前的理论热身

我们来看张图:

RabbitMQ入门:Hello RabbitMQ 代码实例[通俗易懂]

Publisher(生产者)生成消息,然后publish(发布)消息到exchange(路由器,也有资料翻译成交换机),然后根据路由规则将消息传递到Queue(队列),最终交由Consumer(消费者)进行消费处理。

这里的生产者和消费者都是我们的应用,因此我们的代码中要实现这两个部分。

中间的节点就是RabbitMQ 提供的内容,需要再生产者和消费者里面调用其接口来定义和使用这些节点。

 

二、代码实例:Hello RabbitMQ

  1. 首先来实现生产者,这里我没有用Publisher做类名,而是用的Provider,没有特别的用意,就是在起名字的时候不小心写成了这样,不需要在意这个细节,O(∩_∩)O。
    package com.sam.hello_rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Provider {
    
        //定义队列名
        static String QUEUE_NAME = "helloRabbit";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                //1.创建连接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
                
                //2.为通道声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                
                //3.发布消息
                String msg = " hello rabbitmq, welcome to sam's blog.";
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                System.out.println("provider send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                //4.关闭连接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
    }

    在第2步中,channel.queueDeclare 用来创建队列,有5个参数:String queue, 队列名; boolean durable, 该队列是否需要持久化; boolean exclusive,该队列是否为该通道独占的(其他通道是否可以消费该队列); boolean autoDelete,该队列不再使用的时候,是否让RabbitMQ服务器自动删除掉; Map<String, Object> arguments 其他参数。第3步中,channel.basicPublish 发布消息(用在生产者),有4个参数:String exchange, 路由器(有的资料翻译成交换机)的名字,即将消息发到哪个路由器; String routingKey, 路由键,即发布消息时,该消息的路由键是什么; BasicProperties props, 指定消息的基本属性; byte[] body 消息体,也就是消息的内容,是字节数组。 可能你会疑惑,为什么没有exchange呢?因为如果声明了队列,可以不声明路由器。

  2. 接着来实现消费者,消费者实现和生产者过程差不多,但是在这里并没有关闭连接和通道,是因为要消费者一直等待随时可能发来的消息。代码如下:
    package com.sam.hello_rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class HelloConsumer {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.创建连接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.为通道声明队列
                channel.queueDeclare(Provider.QUEUE_NAME, false, false, false, null);
                System.out.println(" **** keep alive ,waiting for messages, and then deal them");
                // 3.通过回调生成消费者
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                        
                        //获取消息内容然后处理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** HelloConsumer" + " get message :[" + msg +"]");
                    }
                };
                
                //4.消费消息
                channel.basicConsume(Provider.QUEUE_NAME, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    在第4步中,channel.basicConsume 用来接收消息,用在消费者,有3个参数:String queue, 队列名字,即要从哪个队列中接收消息; boolean autoAck, 是否自动确认,默认true; Consumer callback 消费者,即谁接收消息。

 

三、运行代码并调试问题

代码写好了,接下来进行测试,

  1. 先来执行下Provider.java,发现报错了:
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:124)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:120)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:142)
        at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:952)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
        at com.sam.hello_rabbitmq.Provider.main(Provider.java:36)
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136)
        ... 3 more
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
        at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:509)
        at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)
        at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
        at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:643)
        at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581)
        at java.lang.Thread.run(Thread.java:745)
    Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
        at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:345)
        at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:286)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:600)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:527)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
        at com.sam.hello_rabbitmq.Provider.main(Provider.java:60)

    关键堆栈信息是:inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true',说是helloRabbit这个队列durable(是否需要持久化)
    参数已经设定成了true 但是代码中指定的是false,冲突了,纳尼?访问RabbitMQ管理页面:http://localhost:15672/#/queues 发现已经存在一个队列helloRabbit,

    RabbitMQ入门:Hello RabbitMQ 代码实例[通俗易懂]

    点helloRabbit的链接,发现队列的durable属性确实是true。哦,原来我之前在做别的练习的时候,创建过一个叫这个名字的队列了,而且属性值刚好为true.

    RabbitMQ入门:Hello RabbitMQ 代码实例[通俗易懂]

    那么接下来删掉这个既存的队列

    RabbitMQ入门:Hello RabbitMQ 代码实例[通俗易懂]

    再去执行Provider.java,后台打印了内容,并且队列中有了一条ready的消息。RabbitMQ入门:Hello RabbitMQ 代码实例[通俗易懂]RabbitMQ入门:Hello RabbitMQ 代码实例[通俗易懂]

    问题解决!

  2. 执行HelloConsumer.java,预想的结果是在启动后,控制台直接打印出log并且RabbitMQ管理页面没有ready的消息:RabbitMQ入门:Hello RabbitMQ 代码实例[通俗易懂]

    RabbitMQ入门:Hello RabbitMQ 代码实例[通俗易懂]

    结果符合预期。

到此,全部工作完美结束。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/120899.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • goland 2021 激活码【2021.8最新】

    (goland 2021 激活码)好多小伙伴总是说激活码老是失效,太麻烦,关注/收藏全栈君太难教程,2021永久激活的方法等着你。IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.net/100143.htmlS32PGH0SQB-eyJsaWNlbnNlSW…

    2022年3月26日
    80
  • 浅析Python优势所在

    浅析Python优势所在浅析Python优势所在 Python优势的最大有点就是比其他语言更简单易学,功能强大的解释型编程语言,它有简洁明了的语法,高效率的高层数据结构,能够简单而有效地实现面向对象编程,欢迎大家学习参考。如果你仅仅认为用Python优势只能写写“HelloWorld”,那你就大错特错了。Python可以被应用到网络开发、GUI开发、图形开发、Web开发、游戏开发、手机

    2022年10月4日
    1
  • openssl 签发证书_keytool生成证书

    openssl 签发证书_keytool生成证书第一步Openssl产生私钥RSA第二步生成公钥第三步创建证书签名请求CSR文件第四步生成证书其它第一步:Openssl产生私钥(RSA)//1、生成私钥$opensslgenrsa-aes128-outfd.key2048GeneratingRSAprivatekey,2048bitlongmodulus….+++……

    2022年9月18日
    3
  • 操作系统虚拟存储管理实验报告_虚拟存储器技术

    操作系统虚拟存储管理实验报告_虚拟存储器技术操作系统虚拟存储管理实验开辟一块内存空间,作为模拟内存(malloc)空间大小为2^14字节假设系统的页面大小为256字节,每个页表项占4个字节(系统的物理页面数为2^6,每个页表正好占一个页面)用位图刻画内存页面的分配状态,可以用一个辅助的变量来对空闲内存页面计数每个进程的虚拟地址空间也是2^14字节每个进程分配9个页面(连页表一共10个页面)创建12个作业,并模拟作业的运行…

    2022年9月26日
    7
  • linux export添加环境变量_查看环境变量linux

    linux export添加环境变量_查看环境变量linux环境变量定义:Itsanamedobjectthatcanbeusedbymultipleapplicationsasitcontainssomevaluableinformationrequiredbytheseapplications环境变量时一个具有特定名字的对象,包含了一个或多个应用程序要用到的信息.可通俗理解为,假如一个工厂里有一大堆的工具

    2025年8月31日
    5
  • Instsrv.exe和Srvany.exe的使用方法

    Instsrv.exe和Srvany.exe的使用方法source: http://www.cnblogs.com/zhushunli/p/6043147.htmlInstsrv.exe和Srvany.exe的使用方法要把应用程序添加为服务,你需要两个小软件:Instsrv.exe和Srvany.exe。Instsrv.exe可以给系统安装和删除服务,Srvany.exe可以让程序以服务的方式运行。这两个软件都包含在W

    2022年6月13日
    23

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号