情景引入
什么是消息中间件?
为什么要使用消息中间件
- 提供者:实现JMS的消息服务中间件服务器。
- 客户端:发送或接受消息的应用。
- 生产者/发布者:创建并发送消息的客户端。
- 消费者/订阅者:接受并处理消息的客户端。
- 消息:应用程序之间传递的数据。
- 消息模式:在客户端之间传递消息的模式,JMS主要是队列模式和主体模式。
- 队列模式特点:
(1)客户端包括生产者和消费者。
(2)队列中的一个消息只能被一个消费者使用。
(3)消费者可以随时取消息。 - 主体模式特点:
(1)客户端包括发布者和订阅者。
(2)主题中的消息可以被所有订阅者消费。
(3)消费者不能消费订阅之前发送的消息。
什么是AMQP?
- JMS是定义与Java,而AMQP是一种传输层协议。
- JMS是属于Java的API,而AMQP是跨语言的。
- JMS消息类型只有两种(主题和队列,后续会说),而AMQP是有五种。
- JMS主要就是针对Java的开发的Client,而AMQP是面向消息,队列,路由。
什么是ActiveMQ呢?
ActiveMQ的安装
环境:Windows
环境:Linux
wget https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz ./activemq start - ConnectionFactory:用于创建连接到消息中间件的连接工厂。
- Connection:代表了应用程序和服务之间的连接通路。
- Destination:指消息发布的地点,包括队列模式和主体模式。
- Session:表示一个单线程的上下文,用于发送和接受消息。
- MessageConsumer:由会话创建,用于接受发送到目的的消息。
- MessageProducer:由会话创建,用于发送消息。
- Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,和一个消息体。

环境:IDEA
步骤:
- 使用IDEA创建一个Maven项目,最简单的骨架即可(quick)
- 导入ActiveMq的依赖
org.apache.activemq activemq-all 5.9.0 情形一:队列模型的消息
- 编写生产者代码(使用队列模型的消息)
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:06 2018/7/14 0014 * @ Description:用于消息的创建类 * @ Modified By: * @Version: $version$ */ public class MessageProducer { //定义ActivMQ的连接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createQueue(QUEUE_NAME); //创建一个生产者 javax.jms.MessageProducer producer = session.createProducer(destination); //创建模拟100个消息 for (int i = 1 ; i <= 100 ; i++){ TextMessage message = session.createTextMessage("我发送message:" + i); //发送消息 producer.send(message); //在本地打印消息 System.out.println("我现在发的消息是:" + message.getText()); } //关闭连接 connection.close(); } } - 查看是否消息产生成功

- 编写消费者代码(消费队列模型的消息)
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:30 2018/7/14 0014 * @ Description:消息消费者 * @ Modified By: * @Version: $version$ */ public class MessageConsumer { //定义ActivMQ的连接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createQueue(QUEUE_NAME); //创建消费者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //创建消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } } - 查看是否进行了消费

备注:我上面进行的是队列模式的消息,而且进行的都是单个消费者,那如果我换成同时有两个消费者消费生产者的消息会怎么样呢?(我们只需要运行两个消费者就可以啦。当然,要保证生产者是产生了消息的哦~~~~否则,拿什么消费呢~)
一个生产者,两个消费者的情况如下:
切记:先运行两个消费者,然后再运行生产者代码:
结果如下:


其实,这就是解释了,我之前说的,队列模式的消息,是只会被一个消费者所使用的,而不会被共享,这也就是和主题模型的差别哦~~~
哈哈
情形二:主题模型的消息
package com.hnu.scw.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:48 2018/7/14 0014 * @ Description:${description} * @ Modified By: * @Version: $version$ */ public class MessageTopicProducer { //定义ActivMQ的连接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的主题名称 private static final String TOPIC_NAME = "MyTopicMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createTopic(TOPIC_NAME); //创建一个生产者 javax.jms.MessageProducer producer = session.createProducer(destination); //创建模拟100个消息 for (int i = 1; i <= 100; i++) { TextMessage message = session.createTextMessage("当前message是(主题模型):" + i); //发送消息 producer.send(message); //在本地打印消息 System.out.println("我现在发的消息是:" + message.getText()); } //关闭连接 connection.close(); } }
- 查看生产者的消息

- 编写消费者
package com.hnu.scw.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:50 2018/7/14 0014 * @ Description:${description} * @ Modified By: * @Version: $version$ */ public class MessageTopicConsumer { //定义ActivMQ的连接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的队列名称 private static final String TOPIC_NAME = "MyTopicMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createTopic(TOPIC_NAME); //创建消费者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //创建消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
- 查看是否消费成功
然而,我们运行消费者代码,发现怎么没有消息消费呢?????????
其实,这就是主题模型的一个特点,如果消费者是在生产者产生消息之后来的,那么是不会对之前的消息进行消费的哦。。。现在知道它们的区别在哪了吧。
如果,现在是两个消费者和一个生产者的主题模型又是怎么的结果呢?


哎哟。。。。这种情况消费者都各自消费了所有的生产者的消息耶。。。。。这就是共享性消息的主题模式,这就是和队列模型的区别,,,大家好好的对比哦~~
ActiveMQ使用(基于Spring)
步骤:
- 创建一个Maven项目(基于最简单的quick骨架即可)
- 导入Spring和ActiveMQ的相关依赖
4.0.0
com.hnu.scw
activemq
1.0-SNAPSHOT
activemq
http://www.example.com
UTF-8
1.7
1.7
4.2.5.RELEASE
junit
junit
4.11
test
org.apache.activemq
activemq-all
5.9.0
org.springframework
spring-context
${spring.version}
org.springframework
spring-jms
${spring.version}
org.springframework
spring-test
${spring.version}
org.apache.activemq
activemq-core
5.7.0
spring-context
org.springframework
maven-clean-plugin
3.0.0
maven-resources-plugin
3.0.2
maven-compiler-plugin
3.7.0
maven-surefire-plugin
2.20.1
maven-jar-plugin
3.0.2
maven-install-plugin
2.5.2
maven-deploy-plugin
2.8.2
- 编写生产者的配置文件.xml,取名为producer.xml
- 编写生产者的接口
package com.hnu.scw.spring; / * @ Author :scw * @ Date :Created in 下午 12:19 2018/7/14 0014 * @ Description:生产者的接口 * @ Modified By: * @Version: $version$ */ public interface ProduceService { void sendMessage(String msg); }
- 编写生产者的实现
package com.hnu.scw.spring; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.annotation.Resource; import javax.jms.*; / * @ Author :scw * @ Date :Created in 下午 2:21 2018/7/15 0015 * @ Description:生产者的实现类 * @ Modified By: * @Version: $version$ */ public class ProduceServiceImpl implements ProduceService { @Autowired private JmsTemplate jmsTemplate; @Resource(name = "queueDestination") private Destination destination; / * 发送消息 * @param msg */ @Override public void sendMessage(final String msg) { jmsTemplate.send(destination , new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(msg); return textMessage; } }); System.out.println("现在发送的消息为: " + msg); } }
- 将生产者的类添加到上述的配置文件中
- 编写生产者的测试类
package com.hnu.scw.spring; import org.springframework.context.support.ClassPathXmlApplicationContext; / * @ Author :scw * @ Date :Created in 下午 2:27 2018/7/15 0015 * @ Description:生产者的测试 * @ Modified By: * @Version: $version$ */ public class ProducerTest { public static void main(String[] args){ ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("producer.xml"); ProduceService bean = classPathXmlApplicationContext.getBean(ProduceService.class); //进行发送消息 for (int i = 0; i < 100 ; i++) { bean.sendMessage("test" + i); } //当消息发送完后,关闭容器 classPathXmlApplicationContext.close(); } }
- 运行测试类,查看生产者是否产生消息成功

通过上述的界面,就可以看到自己配置的队列模式的消息产生成功。 - 编写消费者的消息监听类
- 编写消费者的配置文件,命名为consumer.xml
- 消息消费者ComsumerMessageListener类代码
package com.hnu.scw.spring; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; / * @ Author :scw * @ Date :Created in 下午 3:06 2018/7/15 0015 * @ Description:消息的监听者,用于处理消息 * @ Modified By: * @Version: $version$ */ public class ComsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } - 编写测试文件,测试消费者消费消息是否成功
package com.hnu.scw.spring; import org.springframework.context.support.ClassPathXmlApplicationContext; / * @ Author :scw * @ Date :Created in 下午 3:13 2018/7/15 0015 * @ Description:消费者的测试 * @ Modified By: * @Version: $version$ */ public class ConsumerTest { public static void main(String[] args){ //启动消费者 ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("consumer.xml"); } } - 查看ActiveMQ网站具体消息情况


- ActiveMQ的队列模型就大功告成啦。。。。。。so easy!!!
备注:上面都是进行的ActiveMQ的队列模型的配置,那么我们如果想进行主题模型的又是如何进行操作呢?其实也很简单,只需要修改生产者的xml文件里面的队列即可。比如如下代码:
ActiveMQ的集群
为什么要进行集群呢?
搭建步骤(基于Windows环境,而Linux环境也是一样的操作)
- 复制三个ActiveMQ的服务配置到一个公共目录

- 修改activeMQA的配置文件

只需要在activemq.xml添加如下内容:
- 修改ActiveMQB的配置文件
(1)首先在activemq,xml中添加如下内容:
(2)修改jetty.xml内容,修改服务器的服务端口
- 修改ActiveMQC的配置文件(其实类似和B一样,只是服务端口不一样)
(1)修改activemq.xml中的内容
(2)修改jetty.xml中的内容
- 集群搭建完成~~~~
集群测试(基于IDEA编辑器+Maven)
org.apache.activemq
activemq-all
5.9.0
(3)编写生产者代码
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:06 2018/7/14 0014 * @ Description:用于消息的创建类 * @ Modified By: * @Version: $version$ */ public class MessageProducer { //通过集群的方式进行消息服务器的管理(failover就是进行动态转移,当某个服务器宕机, // 那么就进行其他的服务器选择,randomize表示随机选择) private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createQueue(QUEUE_NAME); //创建一个生产者 javax.jms.MessageProducer producer = session.createProducer(destination); //创建模拟100个消息 for (int i = 1 ; i <= 100 ; i++){ TextMessage message = session.createTextMessage("当前message是:" + i); //发送消息 producer.send(message); //在本地打印消息 System.out.println("我现在发的消息是:" + message.getText()); } //关闭连接 connection.close(); } }
(4)编写消费者代码
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; / * @ Author :scw * @ Date :Created in 上午 11:30 2018/7/14 0014 * @ Description:消息消费者 * @ Modified By: * @Version: $version$ */ public class MessageConsumer { //通过集群的方式进行消息服务器的管理(failover就是进行动态转移,当某个服务器宕机, // 那么就进行其他的服务器选择,randomize表示随机选择) private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //创建连接 Connection connection = activeMQConnectionFactory.createConnection(); //打开连接 connection.start(); //创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建队列目标 Destination destination = session.createQueue(QUEUE_NAME); //创建消费者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //创建消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
(5)进行查看各自的服务器的消息队列的情况。
- 首先,是要确保三个ActiveMQ服务器都进行打开。分析:当三个都服务都运行之后,我们从浏览器运行各自的地址,会发现:
比如:我这里的三个服务的地址分别如下:
- http://127.0.0.1:8161/
- http://127.0.0.1:8162/
- http://127.0.0.1:8163/
重点
其他的消息中间件
其实,类似ActiveMQ这样的消息中间件,用得比较多的还有就是RabbitMQ和Kafka。它们三者各自有各自的优势。大家可以百度进行了解,我就不进行多说了。后面我会同样把这两种消息中间件的使用进行详细的讲解,欢迎大家的关注哦~总的来说,只有适合的场景对应的消息中间件才能发挥最大的作用,没有一种是只有好处而没有坏处的~
#总结
- 主要是对消息中间件的基础知识进行讲解。
- 主要讲解ActiveMQ的使用
- 主要讲解了关于ActiveMQ的集群的搭建
- 稍微提到了类似ActiveMQ消息中间件的其他中间件
- 我所讲述的内容,够大家进行入门了,如果要进行深入的了解还是需要慢慢的去熟悉和学习的,而且消息中间件是非常重要的一个技术,希望大家去好好的了解。
- 最后,感谢各位的阅读哦~~~~
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/226155.html原文链接:https://javaforall.net
