RabbitMQ的六种工作模式以及代码实现

RabbitMQ的六种工作模式以及代码实现一 什么是 RabbitMQrabb 是基于 amqp 协议实现一套高效的数据传输组件 MQ 消息队列 常见的 MQ ActiveMQ Kafka RocketMQ RabbitMQ 官方文档 https www rabbitmq com getstarted html 二 MQ 的应用场景 1 消息异步通知 注册时邮箱认证 添加商品生成详情页和将商品添加到搜索库等 2 消息顺序处理 3 消息延迟处理 4 请求削峰三 六种工作模式 1 1simple 简单模式 1 消息产生后将消息放入队列 2

一、什么是RabbitMQ

二、MQ的应用场景

三、六种工作模式

1.1 simple简单模式
在这里插入图片描述
1)消息产生后将消息放入队列
2)消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)






1.2 work工作模式(资源的竞争)
在这里插入图片描述
1)消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样。保证一条消息只能被一个消费者使用)
2)应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)






1.3 publish/subscribe发布订阅(共享资源)
在这里插入图片描述
1)X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
2)相关场景:邮件群发,群聊天,广播(广告)






1.4 routing路由模式
在这里插入图片描述
1)消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息
2)根据业务功能定义路由字符串
3)从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误








1.5 topic 主题模式(路由模式的一种)
在这里插入图片描述
1)星号、#号代表通配符
2)星号代表一个单词,#号代表一个或多个单词
3)路由功能添加模糊匹配
4)消息产生者产生消息,把消息交给交换机
5)交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费












1.6 RPC
在这里插入图片描述
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1)客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2)服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
3)服务端将RPC方法 的结果发送到RPC响应队列。
4)客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。












四、工作模式的代码实现

1.1 simple简单模式
工具类

public class ConnectionUtil { 
    //连接rabbitmq服务,共享一个工厂对象 private static ConnectionFactory factory; static { 
    factory=new ConnectionFactory(); //设置rabbitmq属性 factory.setHost("192.168.65.128"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/admin"); factory.setPort(5672); } public static Connection getConnection(){ 
    Connection connection=null; try { 
    //获取连接对象 connection = factory.newConnection(); } catch (Exception e) { 
    e.printStackTrace(); } return connection; } } 

提供方

public class Provider { 
    public static void main(String[] args) { 
    try { 
    //获取连接对象 Connection connection = ConnectionUtil.getConnection(); //获取通道对象 Channel channel = connection.createChannel(); //通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建) channel.queueDeclare("queue1",false,false,false,null); //向队列中发送消息 channel.basicPublish("","queue1",null,"Hello RabbitMQ!!!".getBytes()); //断开连接 connection.close(); } catch (Exception e) { 
    e.printStackTrace(); } } } 

消费方

public class Consumer { 
    public static void main(String[] args) { 
    Connection connection = ConnectionUtil.getConnection(); try { 
    //获取通道对象 Channel channel = connection.createChannel(); //监听队列中的消息(消费的是队列,而不是交换机) channel.basicConsume("queue1",true,new DefaultConsumer(channel){ 
    @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("消费者获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ 
    e.printStackTrace(); } } } 

1.2 work工作模式(资源的竞争)
提供方

public class Provider { 
    public static void main(String[] args) { 
    try { 
    //获取连接对象 Connection connection = ConnectionUtil.getConnection(); //获取通道对象 Channel channel = connection.createChannel(); //通过通道创建队列 channel.queueDeclare("queue1",false,false,false,null); //向队列中发送消息 for(int i=1;i<=10;i++){ 
    channel.basicPublish("","queue1",null,("Hello RabbitMQ!!!"+i).getBytes()); } //断开连接 connection.close(); } catch (Exception e) { 
    e.printStackTrace(); } } } 

消费方1

public class Consumer { 
    public static void main(String[] args) { 
    Connection connection = ConnectionUtil.getConnection(); try { 
    //获取通道对象 Channel channel = connection.createChannel(); //监听队列中的消息 channel.basicConsume("queue1",true,new DefaultConsumer(channel){ 
    @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("消费者1获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ 
    e.printStackTrace(); } } } 

消费方2

public class Consumer2 { 
    public static void main(String[] args) { 
    Connection connection = ConnectionUtil.getConnection(); try { 
    //获取通道对象 Channel channel = connection.createChannel(); //监听队列中的消息 channel.basicConsume("queue1",true,new DefaultConsumer(channel){ 
    @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("消费者2获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ 
    e.printStackTrace(); } } } 

1.3 publish/subscribe发布订阅(共享资源)
提供方

//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建 public class Provider { 
    public static void main(String[] args) { 
    try { 
    //获取连接对象 Connection connection = ConnectionUtil.getConnection(); //获取通道对象 Channel channel = connection.createChannel(); //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)  //1.参数一:交换机名称 参数二:交换机类型 channel.exchangeDeclare("fanout_exchange","fanout"); //通过通道创建队列 //channel.queueDeclare("queue1",false,false,false,null); //向队列中发送消息 for(int i=1;i<=10;i++){ 
    channel.basicPublish("fanout_exchange","",null,("Hello RabbitMQ!!!"+i).getBytes()); } //断开连接 connection.close(); } catch (Exception e) { 
    e.printStackTrace(); } } } 

消费方1

public class Consumer { 
    public static void main(String[] args) { 
    Connection connection = ConnectionUtil.getConnection(); try { 
    //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("fanout_queue1",false,false,false,null); //给队列绑定交换机 channel.queueBind("fanout_queue1","fanout_exchange",""); //监听队列中的消息 channel.basicConsume("fanout_queue1",true,new DefaultConsumer(channel){ 
    @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("消费者1获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ 
    e.printStackTrace(); } } } 

消费方2

public class Consumer2 { 
    public static void main(String[] args) { 
    Connection connection = ConnectionUtil.getConnection(); try { 
    //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("fanout_queue2",false,false,false,null); //给队列绑定交换机 channel.queueBind("fanout_queue2","fanout_exchange",""); //监听队列中的消息 channel.basicConsume("fanout_queue2",true,new DefaultConsumer(channel){ 
    @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("消费者2获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ 
    e.printStackTrace(); } } } 

1.4 routing路由模式(不支持通配符)
提供方

//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建 public class Provider { 
    public static void main(String[] args) { 
    try { 
    //获取连接对象 Connection connection = ConnectionUtil.getConnection(); //获取通道对象 Channel channel = connection.createChannel(); //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失) //1.参数一:交换机名称 参数二:交换机类型 channel.exchangeDeclare("direct_exchange","direct"); //向队列中发送消息 for(int i=1;i<=10;i++){ 
    channel.basicPublish("direct_exchange", "insert", //设置路由键,符合路由键的队列,才能拿到消息 null, ("Hello RabbitMQ!!!"+i).getBytes()); } //断开连接 connection.close(); } catch (Exception e) { 
    e.printStackTrace(); } } } 

消费方1

public class Consumer { 
    public static void main(String[] args) { 
    Connection connection = ConnectionUtil.getConnection(); try { 
    //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("direct_queue1",false,false,false,null); //绑定交换机(routingKey:路由键) channel.queueBind("direct_queue1","direct_exchange","select"); channel.queueBind("direct_queue1","direct_exchange","insert"); //监听队列中的消息 channel.basicConsume("direct_queue1",true,new DefaultConsumer(channel){ 
    @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("消费者1获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ 
    e.printStackTrace(); } } } 

消费方2

public class Consumer2 { 
    public static void main(String[] args) { 
    Connection connection = ConnectionUtil.getConnection(); try { 
    //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("direct_queue2",false,false,false,null); //绑定交换机(routingKey:路由键) channel.queueBind("direct_queue2","direct_exchange","delete"); channel.queueBind("direct_queue2","direct_exchange","select"); //监听队列中的消息 channel.basicConsume("direct_queue2",true,new DefaultConsumer(channel){ 
    @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("消费者2获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ 
    e.printStackTrace(); } } } 

1.5 topic 主题模式(路由模式的一种,支持通配符)
提供方

 //交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建 public class Provider { 
    public static void main(String[] args) { 
    try { 
    //获取连接对象 Connection connection = ConnectionUtil.getConnection(); //获取通道对象 Channel channel = connection.createChannel(); //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失) //1.参数一:交换机名称 参数二:交换机类型 channel.exchangeDeclare("topic_exchange","topic"); //向队列中发送消息 for(int i=1;i<=10;i++){ 
    channel.basicPublish("topic_exchange", "emp.hello world", // #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况) *(匹配一个单词) null, ("Hello RabbitMQ!!!"+i).getBytes()); } //断开连接 connection.close(); } catch (Exception e) { 
    e.printStackTrace(); } } } 

消费方1

public class Consumer { 
    public static void main(String[] args) { 
    Connection connection = ConnectionUtil.getConnection(); try { 
    //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("topic_queue1",false,false,false,null); //绑定交换机(routingKey:路由键) #:匹配0-n个单词(之间以.区分,两点之间算一个单词) channel.queueBind("topic_queue1","topic_exchange","emp.#"); //监听队列中的消息 channel.basicConsume("topic_queue1",true,new DefaultConsumer(channel){ 
    @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("消费者1获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ 
    e.printStackTrace(); } } } 

消费方2

public class Consumer2 { 
    public static void main(String[] args) { 
    Connection connection = ConnectionUtil.getConnection(); try { 
    //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("topic_queue2",false,false,false,null); //绑定交换机(routingKey:路由键) *:匹配1个单词(之间以.区分,两点之间算一个单词) channel.queueBind("topic_queue2","topic_exchange","emp.*"); //监听队列中的消息 channel.basicConsume("topic_queue2",true,new DefaultConsumer(channel){ 
    @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("消费者2获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ 
    e.printStackTrace(); } } } 

总结

有不清楚的地方可以在评论下方留言。。。既然来了,不妨点个关注,点个赞吧!!!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/220057.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午9:21
下一篇 2026年3月17日 下午9:21


相关推荐

  • Long类型转Integer

    Long类型转IntegerIntegerx Long valueOf y intValue y 为 Long 包装类

    2026年3月17日
    1
  • UDP协议支持广播发送数据_tcp协议建立连接的过程

    UDP协议支持广播发送数据_tcp协议建立连接的过程UDP(用户数据报协议)是OSI(OpenSystemInterconnection,开放式系统互联)参考模型中一种无连接的传输层协议,提供面向事务的简单不可靠信息传送服务。目录什么是UDP协议?UDP协议数据传输原理DatagramPacket类DatagramSocket类UDP协议网络通信客户端服务器程序服务器端程序客户端程序Hello!大家好!我是灰小猿。之前和大家分享了使用TCP协议进行网络通信的过程,想了解的小伙伴可以看我的这篇文章《Java利.

    2022年5月3日
    78
  • 自己动手写操作系统(三)

    自己动手写操作系统(三)作者 伊梅本文选自 开放系统世界 赛迪网 2002 年 12 月 04 日 http developer ccidnet com pub disp Article columnID 322 amp articleID 32660 amp pageNO 1 在上两期中 自己动手写操作系统 1 2 我向大家讲述了如何使用 Linux 提供的开发工具在软盘的启动扇区写一些代码 以及如何调用 BIOS 的问题 现在 这个操作系统已

    2026年3月26日
    2
  • 关于一个网站FLASH导航条的制作

    关于一个网站FLASH导航条的制作http shineday nease net 这里的代码是抄袭过来的 根本是为了说明问题和解决问题 vardrag 0 1 震动参数 varflex 0 7 震动参数 mc y 20 mc goalX 10 nbsp 装入动画后影片剪辑 也就是滑块 首先出现的位置 mc onEnterFrame function nbsp nbsp this Step this Step

    2026年3月17日
    2
  • ldap 统一认证 java_基于LDAP的统一身份认证系统的设计与实现

    ldap 统一认证 java_基于LDAP的统一身份认证系统的设计与实现摘要 随着全球信息化和 Internet 技术的迅速发展 信息化建设水平已成为衡量一个国家和地区综合实力的重要标志 在信息化建设进程中 信息的安全问题日益突出 作为信息网络安全的一个重要方面 身份认证和单点登录技术的应用日益广泛 迫切需要一种支持多种平台 统一多种认证方式 易于管理 安全的认证系统 本文根据当前正在建设的邯郸市居民卡项目的需求 设计并实现了一种新的统一身份认证系统 本文首先阐述了现有

    2026年3月17日
    2
  • Linux服务器Tomcat中Catalina.log中定位错误信息方法

    linux 中tomcat日志分析通过命令定位找到错误信息

    2022年2月26日
    60

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号