手把手教你如何玩转消息中间件(ActiveMQ)

手把手教你如何玩转消息中间件(ActiveMQ)情景引入情景分析基本概念的引导本模块主要讲解关于消息中间件的相关基础知识 也是方便我们后面的学习 什么是中间件 非操作系统软件 非业务应用软件 不是直接给最终用户使用 不能直接给用户带来价值的软件 我们就可以称为中间件 比如 Dubbo Tomcat Jetty Jboss 都是属于的 什么是消息中间件 百度百科解释 消息中间件利用高效可靠的消息传递机制进行平

情景引入

什么是消息中间件?

为什么要使用消息中间件

  • 提供者:实现JMS的消息服务中间件服务器。
  • 客户端:发送或接受消息的应用。
  • 生产者/发布者:创建并发送消息的客户端。
  • 消费者/订阅者:接受并处理消息的客户端。
  • 消息:应用程序之间传递的数据。
  • 消息模式:在客户端之间传递消息的模式,JMS主要是队列模式和主体模式。
  • 队列模式特点:
    (1)客户端包括生产者和消费者。
    (2)队列中的一个消息只能被一个消费者使用。
    (3)消费者可以随时取消息。






  • 主体模式特点:
    (1)客户端包括发布者和订阅者。
    (2)主题中的消息可以被所有订阅者消费。
    (3)消费者不能消费订阅之前发送的消息。






什么是AMQP?

  • JMS是定义与Java,而AMQP是一种传输层协议。
  • JMS是属于Java的API,而AMQP是跨语言的。
  • JMS消息类型只有两种(主题和队列,后续会说),而AMQP是有五种。
  • JMS主要就是针对Java的开发的Client,而AMQP是面向消息,队列,路由。

什么是ActiveMQ呢?

ActiveMQ的安装

环境:Windows

环境:Linux

wget https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz 
./activemq start 
  • ConnectionFactory:用于创建连接到消息中间件的连接工厂。
  • Connection:代表了应用程序和服务之间的连接通路。
  • Destination:指消息发布的地点,包括队列模式和主体模式。
  • Session:表示一个单线程的上下文,用于发送和接受消息。
  • MessageConsumer:由会话创建,用于接受发送到目的的消息。
  • MessageProducer:由会话创建,用于发送消息。
  • Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,和一个消息体。
    这里写图片描述
    环境:IDEA
    步骤:






  1. 使用IDEA创建一个Maven项目,最简单的骨架即可(quick)
  2. 导入ActiveMq的依赖
               org.apache.activemq             activemq-all             5.9.0        

情形一:队列模型的消息

  1. 编写生产者代码(使用队列模型的消息)
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:06 2018/7/14 0014 * @ Description:用于消息的创建类 * @ Modified By: * @Version: $version$ */ public class MessageProducer { //定义ActivMQ的连接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createQueue(QUEUE_NAME); //创建一个生产者 javax.jms.MessageProducer producer = session.createProducer(destination); //创建模拟100个消息 for (int i = 1 ; i <= 100 ; i++){ TextMessage message = session.createTextMessage("我发送message:" + i); //发送消息 producer.send(message); //在本地打印消息 System.out.println("我现在发的消息是:" + message.getText()); } //关闭连接 connection.close(); } } 
  1. 查看是否消息产生成功
    这里写图片描述

  2. 编写消费者代码(消费队列模型的消息)
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:30 2018/7/14 0014 * @ Description:消息消费者 * @ Modified By: * @Version: $version$ */ public class MessageConsumer { //定义ActivMQ的连接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createQueue(QUEUE_NAME); //创建消费者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //创建消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } } 
  1. 查看是否进行了消费
    这里写图片描述
    备注:我上面进行的是队列模式的消息,而且进行的都是单个消费者,那如果我换成同时有两个消费者消费生产者的消息会怎么样呢?(我们只需要运行两个消费者就可以啦。当然,要保证生产者是产生了消息的哦~~~~否则,拿什么消费呢~)
    一个生产者,两个消费者的情况如下:
    切记:先运行两个消费者,然后再运行生产者代码:
    结果如下:
    这里写图片描述
    这里写图片描述














    其实,这就是解释了,我之前说的,队列模式的消息,是只会被一个消费者所使用的,而不会被共享,这也就是和主题模型的差别哦~~~哈哈

情形二:主题模型的消息

package com.hnu.scw.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:48 2018/7/14 0014 * @ Description:${description} * @ Modified By: * @Version: $version$ */ public class MessageTopicProducer { //定义ActivMQ的连接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的主题名称 private static final String TOPIC_NAME = "MyTopicMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createTopic(TOPIC_NAME); //创建一个生产者 javax.jms.MessageProducer producer = session.createProducer(destination); //创建模拟100个消息 for (int i = 1; i <= 100; i++) { TextMessage message = session.createTextMessage("当前message是(主题模型):" + i); //发送消息 producer.send(message); //在本地打印消息 System.out.println("我现在发的消息是:" + message.getText()); } //关闭连接 connection.close(); } } 
  1. 查看生产者的消息
    这里写图片描述

  2. 编写消费者
package com.hnu.scw.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:50 2018/7/14 0014 * @ Description:${description} * @ Modified By: * @Version: $version$ */ public class MessageTopicConsumer { //定义ActivMQ的连接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的队列名称 private static final String TOPIC_NAME = "MyTopicMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createTopic(TOPIC_NAME); //创建消费者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //创建消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } } 
  1. 查看是否消费成功
    然而,我们运行消费者代码,发现怎么没有消息消费呢?????????
    其实,这就是主题模型的一个特点,如果消费者是在生产者产生消息之后来的,那么是不会对之前的消息进行消费的哦。。。现在知道它们的区别在哪了吧。
    如果,现在是两个消费者和一个生产者的主题模型又是怎么的结果呢?
    这里写图片描述
    这里写图片描述
    哎哟。。。。这种情况消费者都各自消费了所有的生产者的消息耶。。。。。这就是共享性消息的主题模式,这就是和队列模型的区别,,,大家好好的对比哦~~












ActiveMQ使用(基于Spring)

步骤:

  1. 创建一个Maven项目(基于最简单的quick骨架即可)
  2. 导入Spring和ActiveMQ的相关依赖
 
   
   
   
     4.0.0 
    
   
     com.hnu.scw 
    
   
     activemq 
    
   
     1.0-SNAPSHOT 
    
   
     activemq 
    
    
   
     http://www.example.com 
    
    
    
      UTF-8 
     
    
      1.7 
     
    
      1.7 
     
    
      4.2.5.RELEASE 
     
    
    
     
     
       junit 
      
     
       junit 
      
     
       4.11 
      
     
       test 
      
     
     
     
     
       org.apache.activemq 
      
     
       activemq-all 
      
     
       5.9.0 
      
     
     
     
     
       org.springframework 
      
     
       spring-context 
      
     
       ${spring.version} 
      
     
     
     
       org.springframework 
      
     
       spring-jms 
      
     
       ${spring.version} 
      
     
     
     
       org.springframework 
      
     
       spring-test 
      
     
       ${spring.version} 
      
     
     
     
       org.apache.activemq 
      
     
       activemq-core 
      
     
       5.7.0 
      
      
       
       
         spring-context 
        
       
         org.springframework 
        
       
      
     
    
    
     
      
      
       
       
         maven-clean-plugin 
        
       
         3.0.0 
        
       
       
       
       
         maven-resources-plugin 
        
       
         3.0.2 
        
       
       
       
         maven-compiler-plugin 
        
       
         3.7.0 
        
       
       
       
         maven-surefire-plugin 
        
       
         2.20.1 
        
       
       
       
         maven-jar-plugin 
        
       
         3.0.2 
        
       
       
       
         maven-install-plugin 
        
       
         2.5.2 
        
       
       
       
         maven-deploy-plugin 
        
       
         2.8.2 
        
       
      
     
    
   
  1. 编写生产者的配置文件.xml,取名为producer.xml
 
   
   
    
    
    
     
    
    
    
     
    
    
    
     
    
    
    
     
    
   
  1. 编写生产者的接口
package com.hnu.scw.spring; / * @ Author :scw * @ Date :Created in 下午 12:19 2018/7/14 0014 * @ Description:生产者的接口 * @ Modified By: * @Version: $version$ */ public interface ProduceService { void sendMessage(String msg); } 
  1. 编写生产者的实现
package com.hnu.scw.spring; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.annotation.Resource; import javax.jms.*; / * @ Author :scw * @ Date :Created in 下午 2:21 2018/7/15 0015 * @ Description:生产者的实现类 * @ Modified By: * @Version: $version$ */ public class ProduceServiceImpl implements ProduceService { @Autowired private JmsTemplate jmsTemplate; @Resource(name = "queueDestination") private Destination destination; / * 发送消息 * @param msg */ @Override public void sendMessage(final String msg) { jmsTemplate.send(destination , new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(msg); return textMessage; } }); System.out.println("现在发送的消息为: " + msg); } } 
  1. 将生产者的类添加到上述的配置文件中
 
   
   
  1. 编写生产者的测试类
package com.hnu.scw.spring; import org.springframework.context.support.ClassPathXmlApplicationContext; / * @ Author :scw * @ Date :Created in 下午 2:27 2018/7/15 0015 * @ Description:生产者的测试 * @ Modified By: * @Version: $version$ */ public class ProducerTest { public static void main(String[] args){ ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("producer.xml"); ProduceService bean = classPathXmlApplicationContext.getBean(ProduceService.class); //进行发送消息 for (int i = 0; i < 100 ; i++) { bean.sendMessage("test" + i); } //当消息发送完后,关闭容器 classPathXmlApplicationContext.close(); } } 
  1. 运行测试类,查看生产者是否产生消息成功
    这里写图片描述
    通过上述的界面,就可以看到自己配置的队列模式的消息产生成功。




  2. 编写消费者的消息监听类
  3. 编写消费者的配置文件,命名为consumer.xml
                                                                                                                   
  1. 消息消费者ComsumerMessageListener类代码
package com.hnu.scw.spring; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; / * @ Author :scw * @ Date :Created in 下午 3:06 2018/7/15 0015 * @ Description:消息的监听者,用于处理消息 * @ Modified By: * @Version: $version$ */ public class ComsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } 
  1. 编写测试文件,测试消费者消费消息是否成功
package com.hnu.scw.spring; import org.springframework.context.support.ClassPathXmlApplicationContext; / * @ Author :scw * @ Date :Created in 下午 3:13 2018/7/15 0015 * @ Description:消费者的测试 * @ Modified By: * @Version: $version$ */ public class ConsumerTest { public static void main(String[] args){ //启动消费者 ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("consumer.xml"); } } 
  1. 查看ActiveMQ网站具体消息情况
    这里写图片描述
    这里写图片描述




  2. ActiveMQ的队列模型就大功告成啦。。。。。。so easy!!!
    备注:上面都是进行的ActiveMQ的队列模型的配置,那么我们如果想进行主题模型的又是如何进行操作呢?其实也很简单,只需要修改生产者的xml文件里面的队列即可。比如如下代码:

       

ActiveMQ的集群

为什么要进行集群呢?

搭建步骤(基于Windows环境,而Linux环境也是一样的操作)

  1. 复制三个ActiveMQ的服务配置到一个公共目录
    这里写图片描述

  2. 修改activeMQA的配置文件
    这里写图片描述
    只需要在activemq.xml添加如下内容:




           
  1. 修改ActiveMQB的配置文件
    (1)首先在activemq,xml中添加如下内容:

                              

(2)修改jetty.xml内容,修改服务器的服务端口

 
   
    
    
    
   
  1. 修改ActiveMQC的配置文件(其实类似和B一样,只是服务端口不一样)
    (1)修改activemq.xml中的内容

 
   
   
   
    
   
   
   
    
   

(2)修改jetty.xml中的内容

 
   
    
    
    
   
  1. 集群搭建完成~~~~

集群测试(基于IDEA编辑器+Maven)

 
   
   
   
     org.apache.activemq 
    
   
     activemq-all 
    
   
     5.9.0 
    
   

(3)编写生产者代码

package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:06 2018/7/14 0014 * @ Description:用于消息的创建类 * @ Modified By: * @Version: $version$ */ public class MessageProducer { //通过集群的方式进行消息服务器的管理(failover就是进行动态转移,当某个服务器宕机, // 那么就进行其他的服务器选择,randomize表示随机选择) private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createQueue(QUEUE_NAME); //创建一个生产者 javax.jms.MessageProducer producer = session.createProducer(destination); //创建模拟100个消息 for (int i = 1 ; i <= 100 ; i++){ TextMessage message = session.createTextMessage("当前message是:" + i); //发送消息 producer.send(message); //在本地打印消息 System.out.println("我现在发的消息是:" + message.getText()); } //关闭连接 connection.close(); } } 

(4)编写消费者代码

package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:30 2018/7/14 0014 * @ Description:消息消费者 * @ Modified By: * @Version: $version$ */ public class MessageConsumer { //通过集群的方式进行消息服务器的管理(failover就是进行动态转移,当某个服务器宕机, // 那么就进行其他的服务器选择,randomize表示随机选择) private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createQueue(QUEUE_NAME); //创建消费者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //创建消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } } 

(5)进行查看各自的服务器的消息队列的情况。

  1. 首先,是要确保三个ActiveMQ服务器都进行打开。分析:当三个都服务都运行之后,我们从浏览器运行各自的地址,会发现:
    比如:我这里的三个服务的地址分别如下:

  • http://127.0.0.1:8161/
  • http://127.0.0.1:8162/
  • http://127.0.0.1:8163/

重点

其他的消息中间件

其实,类似ActiveMQ这样的消息中间件,用得比较多的还有就是RabbitMQ和Kafka。它们三者各自有各自的优势。大家可以百度进行了解,我就不进行多说了。后面我会同样把这两种消息中间件的使用进行详细的讲解,欢迎大家的关注哦~总的来说,只有适合的场景对应的消息中间件才能发挥最大的作用,没有一种是只有好处而没有坏处的~
#总结

  • 主要是对消息中间件的基础知识进行讲解。
  • 主要讲解ActiveMQ的使用
  • 主要讲解了关于ActiveMQ的集群的搭建
  • 稍微提到了类似ActiveMQ消息中间件的其他中间件
  • 我所讲述的内容,够大家进行入门了,如果要进行深入的了解还是需要慢慢的去熟悉和学习的,而且消息中间件是非常重要的一个技术,希望大家去好好的了解。
  • 最后,感谢各位的阅读哦~~~~
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/226155.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 上午7:42
下一篇 2026年3月17日 上午7:42


相关推荐

  • 复试的时候面试官问我还有什么问题(和面试官聊得很好但没有录用)

    我是一名程序员,我的主要编程语言是Java,我更是一名Web开发人员,所以我必须要了解HTTP,所以本篇文章就来带你从HTTP入门到进阶,看完让你有一种恍然大悟、醍醐灌顶的感觉。最初在有网络之前,我们的电脑都是单机的,单机系统是孤立的,我还记得05年前那会儿家里有个电脑,想打电脑游戏还得两个人在一个电脑上玩儿,及其不方便。我就想为什么家里人不让上网,我的同学xxx家里有网,每…

    2022年4月17日
    46
  • Servlet.service() for servlet [dispatcherServlet] in context with path [] th

    Servlet.service() for servlet [dispatcherServlet] in context with path [] th控制台报错信息Servlet.service()forservlet[dispatcherServlet]incontextwithpath[]threwexception[Requestprocessingfailed;nestedexceptionisjava.lang.NullPointerException]withrootcausee1.controller层没有加@ResponseBody2.Service层实现类未添加注解@Autowired记

    2022年5月8日
    329
  • 将DedeCMS从子目录移动到根目录的方法

    将DedeCMS从子目录移动到根目录的方法

    2021年10月10日
    47
  • 批处理命令教程 pdf_常用批处理命令

    批处理命令教程 pdf_常用批处理命令【echo命令】  打开回显或关闭请求回显功能,或显示消息。如果没有任何参数,echo命令将显示当前回显设置。  语法  echo[{on|off}][message]  Sample:@echooff/echohelloworld  在实际应用中我们会把这条命令和重定向符号(也称为管道符号,一般用>>>^)结合来实现输入一  些…

    2022年8月22日
    7
  • python for循环语句怎么写

    python for循环语句怎么写想必大家都知道 python 循环语句吧 可以 python 循环语句有多种 比如 for 循环 while 循环 if else 等等 今天小编就给大家讲讲 for 循环语句 for 循环语句是 python 中的一个循环控制语句 任何有序的序列对象内的元素都可以遍历 比如字符串 列表 元组等可迭代对像 之前讲过的 if 语句虽然和 for 语句用法不同 但可以用在 for 语句下做条件语句使用 for 语句的基本格式 pyth

    2025年8月10日
    5
  • 基于情感词典进行情感态度分析[通俗易懂]

    基于情感词典进行情感态度分析[通俗易懂]情感分析是指挖掘文本表达的观点,识别主体对某客体的评价是褒还是贬,褒贬根据进态度行倾向性研究。文本情感分析可以分为基于机器学习的情感分类方法和基于语义理解的情感分析。基于机器学习进行语义分析的话需要大量的训练集,同时需要人工对其进行分类标注。我所使用的方法是基于语义理解中的使用情感词典进行情感态度分析。下面是我所使用的情感词典:链接:HTTPS://pan.baidu.com/s/1xC…

    2022年8月23日
    8

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号