ActiveMQ入门系列三:发布/订阅模式

在上一篇《ActiveMQ入门系列二:入门代码实例(点对点模式)》中提到了ActiveMQ中的两种模式:点对点模式(PTP)和发布/订阅模式(Pub&Sub),详细介绍了点对点模式并用代码

大家好,又见面了,我是全栈君。

在上一篇《ActiveMQ入门系列二:入门代码实例(点对点模式)》中提到了ActiveMQ中的两种模式:点对点模式(PTP)和发布/订阅模式(Pub & Sub),详细介绍了点对点模式并用代码实例进行说明,今天就介绍下发布/订阅模式。

一、理论基础

发布/订阅模式的工作示意图:

ActiveMQ入门系列三:发布/订阅模式

  • 消息生产者将消息(发布)到topic中,可以同时有多个消息消费者(订阅)消费该消息。
  • 和点对点方式不同,发布到topic的消息会被所有订阅者消费。
  • 当生产者发布消息,不管是否有消费者,都不会保存消息。
  • 一定要先有消息的消费者,后有消息的生产者。

二、代码实现

  1. 生产者
    package com.sam.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @author JAVA开发老菜鸟
     *
     */
    public class TopicProducer {
    
        public static final  String QUEUE_NAME = "topic-demo";//队列名
    
        public void producer(String message) throws JMSException {
            ConnectionFactory factory = null;
            Connection connection = null;
            Session session = null;
            MessageProducer producer = null;
            try {
                /**
                 * 1.创建连接工厂
                 * 创建工厂,构造方法有三个参数:分别是用户名、密码、连接地址
                 * 无参构造:有默认的连接地址,localhost
                 * 一个参数:无验证模式,无用户的认证
                 * 三个参数:有认证和连接地址
                 */
                factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
                /**
                 * 2.创建连接
                 * 无参数
                 * 有参数:用户名、密码
                 */
                connection = factory.createConnection();
                /**
                 * 3.启动连接
                 * 生产者可以不启动,因为在发送消息的时候回进行检查
                 * 如果未启动连接,会自动启动
                 * 如果有特殊配置,需要配置完成后再启动连接
                 */
                connection.start();
                /**
                 * 4.用连接创建会话
                 * 有两个参数:是否需要事务、消息确认机制
                 * 如果支持事务,对于生产者来说第二个参数就无效了,建议传入Session.SESSION_TRANSACTED
                 * 如果不支持事务,第二个参数必须传递且有效
                 *
                 * AUTO_ACKNOWLEDGE:自动确认,消息处理后自动确认(商业开发不推荐)
                 * CLIENT_ACKNOWLEDGE:客户端手动确认,消费者处理后必须手动确认
                 * DUPS_OK_ACKNOWLEDGE:有副本的客户端手动确认,消息可以多次处理(不建议)
                 */
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                /**
                 * 5.用会话创建目的地(主题)、生产者、消息
                 * 队列名是队列的唯一标记
                 * 创建生产者的时候可以不指定目的地,可以在发送的时候指定
                 */
                Destination destination = session.createTopic(QUEUE_NAME);
                producer = session.createProducer(destination);
                TextMessage textMessage = session.createTextMessage(message);
                /**
                 * 6.生产者发送消息到目的地
                 */
                producer.send(textMessage);
                System.out.println("消息发送成功");
            } catch(Exception ex){
                throw ex;
            } finally {
                /**
                 * 7.释放资源
                 */
                if(producer != null){
                    producer.close();
                }
    
                if(session != null){
                    session.close();
                }
    
                if(connection != null){
                    connection.close();
                }
            }
        }
    
        public static void main(String[] args){
            TopicProducer producer = new TopicProducer();
            try{
                producer.producer("hello, activemq");
            } catch (Exception ex){
                ex.printStackTrace();
            }
        }
    }

    发布/订阅模式的生产者和点对点模式的代码主要区别就是Destination的创建方式,点对点模式是调用session.createQueue(QUEUE_NAME),而发布/订阅模式是调用session.createTopic(QUEUE_NAME)。

  2. 消费者
    package com.sam.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    /**
     * @author JAVA开发老菜鸟
     *
     * 观察者消费--监听消费
     */
    public class TopicConsumer {
    
        public void consumer() throws JMSException, IOException {
            ConnectionFactory factory = null;
            Connection connection = null;
            Session session = null;
            MessageConsumer consumer = null;
            try {
                factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
                connection = factory.createConnection();
                /**
                 * 消费者必须启动连接,否则无法消费
                 */
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createTopic(TopicProducer.QUEUE_NAME);
                consumer = session.createConsumer(destination);
                /**
                 * 注册监听器,队列中的消息变化会自动触发监听器,接收并自动处理消息
                 *
                 * 监听器一旦注册,永久有效,一直到程序关闭
                 * 监听器可以注册多个,相当于集群
                 * activemq自动轮询多个监听器,实现并行处理
                 */
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
    
                        try {
                            TextMessage om = (TextMessage) message;
                            String data = om.getText();
                            System.out.println(data);
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
            } catch(Exception ex){
                throw ex;
            }
        }
    
        public static void main(String[] args){
            TopicConsumer consumer = new TopicConsumer();
            try{
                consumer.consumer();
            } catch (Exception ex){
                ex.printStackTrace();
            }
        }
    }

    消费者在点对点监听消费的基础上进行变化,主要区别有两个:1.同生产者一样,也是Destination的创建方式不同; 2.消息无需手动确认,直接采用自动确认机制

 

代码写完了,接下来进行测试,由于subscribe可以有多个,而且每个都可以消费到相同的消息,因此我们消费者启动两个。

先执行生产者

ActiveMQ入门系列三:发布/订阅模式

在控制台页面的Topics下出现了我定义的topic并且有1条消息发送成功且未消费

ActiveMQ入门系列三:发布/订阅模式

 

然后执行两个消费者,两个消费者都没有消费到任何消息

ActiveMQ入门系列三:发布/订阅模式

ActiveMQ入门系列三:发布/订阅模式

并且,控制台页面只是多了2个消费者,已经消费的消息还是0

ActiveMQ入门系列三:发布/订阅模式

 

 为什么呢?还记得前面的理论基础说的吗?就是这个原因

ActiveMQ入门系列三:发布/订阅模式

继续,我们在两个消费者启动好的前提下,再执行生产者, 这个时候会发现两个消费者都消费了该消息

ActiveMQ入门系列三:发布/订阅模式

ActiveMQ入门系列三:发布/订阅模式

ActiveMQ入门系列三:发布/订阅模式

再看下控制台页面

ActiveMQ入门系列三:发布/订阅模式

已消费消息这里是2,这个2并不是说之前发的两个消息都消费了,而是说第二个消息消费了2次, 1 * 2 = 2

不信的话,可以再执行一遍生产者,这个时候就是4,而不是3

ActiveMQ入门系列三:发布/订阅模式

累计发送过3条消息,消息消费了4次,这里的4就是后面两条分别被消费了2次, 2 * 2 = 4

 

三、两种模式比较

ActiveMQ入门系列三:发布/订阅模式

 

好,到这里,发布/订阅模式就介绍完了。

如果有收获,就点个赞呗

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/120886.html原文链接:https://javaforall.net

(0)
上一篇 2022年2月16日 下午2:00
下一篇 2022年2月16日 下午2:00


相关推荐

  • java服务器开发心得

    java服务器开发心得本人已从事java服务器开发三年多了,对java服务器开发比较有心得,特此对这三年多来进行下技术总结,并与大家分享。作为服务器开发,对基础知识的掌握程度,将决定你的服务器各方面的能力,一般在进行java服务器开发前,最重要的是能够熟练运用以下技术:javaclassLoader、javathread、javaI/O(NIO)和javasocket。 一般来说,服务器设计大致

    2022年5月6日
    52
  • 使用 Nginx 配置jsp服务器

    使用 Nginx 配置jsp服务器br Nginx 简介 br Nginx enginex 是一个高性能的 HTTP 和反向代理服务器 也是一个 IMAP POP3 SMTP 代理服务器 Nginx 是由 IgorSysoev 为俄罗斯访问量第二的 Rambler ru 站点开发的 它已经在该站点运行超过两年半了 Igor 将源代码以类 BSD 许可证的形式发布 尽管还是测试版 但是 Nginx 已经因为它的稳定性 丰富的功能集 示例配置文件和低系统资源的消耗而闻名了 br 根据最新一

    2026年3月18日
    1
  • 使用reaver命令穷举PIN码破解WPA2-PSK加密的无线网络[通俗易懂]

    使用reaver命令穷举PIN码破解WPA2-PSK加密的无线网络[通俗易懂]【前言】现在的路由器大多都默认用WPA2-PSK方式对无线网络进行加密了,不能再像WEP加密方式那样好破解,使用字典又需要费心费力地整理字典,而且字典破解的效率还慢。所以我们需要更有效率的破解方法。好在现在大多数的路由器都提供WPS功能,通过这个功能,用户可以使用PIN码登录到路由器。但这个PIN码的长度只有8位,而且可能的取值只有11000种(注意,不是10…

    2022年6月4日
    36
  • uniapp清除浏览器缓存[通俗易懂]

    uniapp清除浏览器缓存[通俗易懂]uni.clearStorage();

    2022年7月18日
    51
  • Mapper.xml中的useGeneratedKeys[通俗易懂]

    Mapper.xml中的useGeneratedKeys[通俗易懂]mapper.xml中添加属性“useGeneratedKeys”和“keyProperty”,其中keyProperty是Java对象的属性名.例:<insertid="insertSelective"parameterType="com.xxx.StudentMistakeKpoints"useGeneratedKeys="true"keyProperty="id">…

    2022年6月24日
    43
  • 深度学习基础之-2.6标签值归一化

    深度学习基础之-2.6标签值归一化提出问题在计算Loss时,会达到172.287,337.246这样大的数值,一般Loss都应该小于1.解决问题标签值也归一化公式如下:(1)ynew=y−yminymax−ymin=y−yminyrangey_{new}=\frac{y-y_{min}}{y_{max}-y_{min}}=\frac{y-y_{min}}{y_{range}}\tag{1}ynew​=ymax​−…

    2022年10月10日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号