Topic主题发布和订阅消息
定义生产者
package com.dpb.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; / * ActiveMQ中的生产者(Producer) * @author dengp * */ public class MyProducer {
public void sendhello2ActiveMq(String messageText) {
TopicSession session = null; TopicConnection conn = null; try {
TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616"); conn = factory.createTopicConnection(); conn.start(); session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 创建消息队列 Topic topic = session.createTopic("test-topic"); // 创建消息发送者 TopicPublisher publisher = session.createPublisher(topic); // 设置持久化模式 NON_PERSISTENT不开启 PERSISTENT 开启 默认是开启 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name", "波波烤鸭"); mapMessage.setString("address", "深圳"); publisher.send(mapMessage); // 提交会话 session.commit(); } catch (Exception e) {
e.printStackTrace(); System.out.println("访问ActiveMQ服务发生错误!!"); } finally {
try {
// 回收会话资源 if (null != session) session.close(); } catch (JMSException e) {
e.printStackTrace(); } try {
// 回收链接资源 if (null != conn) conn.close(); } catch (JMSException e) {
e.printStackTrace(); } } } }
定义消费者
package com.dpb.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; / * ActiveMQ中的消费者(Consumer) * @author dengp * */ public class MyConsumer {
public void reciveHelloFormActiveMq() {
TopicSession session = null; TopicConnection conn = null; try {
TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616"); conn = factory.createTopicConnection(); conn.start(); session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 创建消息队列 Topic topic = session.createTopic("test-topic"); // 创建消息接受者 MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() {
@Override public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg; try {
System.out.println(map.getString("name") + "接收#" + map.getString("address")); } catch (JMSException e) {
e.printStackTrace(); } } } }); // 休眠100s再关闭 Thread.sleep(1000 * 100); // 提交会话 session.commit(); } catch (Exception e) {
e.printStackTrace(); System.out.println("访问ActiveMQ服务发生错误!!"); } finally {
try {
// 回收会话资源 if (null != session) session.close(); } catch (JMSException e) {
e.printStackTrace(); } try {
// 回收链接资源 if (null != conn) conn.close(); } catch (JMSException e) {
e.printStackTrace(); } } } }
测试
先启动消费者,可以开启多个
public static void main(String[] args) {
MyConsumer con = new MyConsumer(); con.reciveHelloFormActiveMq(); }


启动生产者
public static void main(String[] args) {
MyProducer pro = new MyProducer(); pro.sendhello2ActiveMq("你好啊...topic"); }


好了本文介绍到此,下篇介绍ActiveMQ和Spring的整合
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/226397.html原文链接:https://javaforall.net
