ActiveMQ发布订阅模式

ActiveMQ发布订阅模式Topic 主题发布和订阅消息前面讲的案例都是点对点的消息 即一个生产者发送的一条消息只能被一个消费者消费 然后就移除了 而 topic 模式一条消息可以被多个消费者订阅 关系如下 定义生产者 packagecom dpb topic importjavax jms Connection importjavax jms ConnectionFa importjavax

Topic主题发布和订阅消息

定义生产者

package com.dpb.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; / * ActiveMQ中的生产者(Producer) * @author dengp * */ public class MyProducer { 
    public void sendhello2ActiveMq(String messageText) { 
    TopicSession session = null; TopicConnection conn = null; try { 
    TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616"); conn = factory.createTopicConnection(); conn.start(); session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 创建消息队列 Topic topic = session.createTopic("test-topic"); // 创建消息发送者 TopicPublisher publisher = session.createPublisher(topic); // 设置持久化模式 NON_PERSISTENT不开启 PERSISTENT 开启 默认是开启 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name", "波波烤鸭"); mapMessage.setString("address", "深圳"); publisher.send(mapMessage); // 提交会话 session.commit(); } catch (Exception e) { 
    e.printStackTrace(); System.out.println("访问ActiveMQ服务发生错误!!"); } finally { 
    try { 
    // 回收会话资源 if (null != session) session.close(); } catch (JMSException e) { 
    e.printStackTrace(); } try { 
    // 回收链接资源 if (null != conn) conn.close(); } catch (JMSException e) { 
    e.printStackTrace(); } } } } 

定义消费者

package com.dpb.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; / * ActiveMQ中的消费者(Consumer) * @author dengp * */ public class MyConsumer { 
    public void reciveHelloFormActiveMq() { 
    TopicSession session = null; TopicConnection conn = null; try { 
    TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616"); conn = factory.createTopicConnection(); conn.start(); session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 创建消息队列 Topic topic = session.createTopic("test-topic"); // 创建消息接受者 MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { 
    @Override public void onMessage(Message msg) { 
    if (msg != null) { 
    MapMessage map = (MapMessage) msg; try { 
    System.out.println(map.getString("name") + "接收#" + map.getString("address")); } catch (JMSException e) { 
    e.printStackTrace(); } } } }); // 休眠100s再关闭 Thread.sleep(1000 * 100); // 提交会话 session.commit(); } catch (Exception e) { 
    e.printStackTrace(); System.out.println("访问ActiveMQ服务发生错误!!"); } finally { 
    try { 
    // 回收会话资源 if (null != session) session.close(); } catch (JMSException e) { 
    e.printStackTrace(); } try { 
    // 回收链接资源 if (null != conn) conn.close(); } catch (JMSException e) { 
    e.printStackTrace(); } } } } 

测试

先启动消费者,可以开启多个

public static void main(String[] args) { 
    MyConsumer con = new MyConsumer(); con.reciveHelloFormActiveMq(); } 

在这里插入图片描述
在这里插入图片描述
启动生产者




public static void main(String[] args) { 
    MyProducer pro = new MyProducer(); pro.sendhello2ActiveMq("你好啊...topic"); } 

在这里插入图片描述
在这里插入图片描述
好了本文介绍到此,下篇介绍ActiveMQ和Spring的整合




版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/226397.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午11:00
下一篇 2026年3月16日 下午11:00


相关推荐

  • 淘宝开店经验 心得 攻略

    淘宝开店经验 心得 攻略淘宝省钱攻略 人人可以做到 2013 年 9 月 6 日过滤阳光发表评论 经常在网上看到很多介绍淘宝网购省钱的攻略 其实很多都是上什么返利网上购物 其实这种大部分省下的钱都是到了返利网的口袋的 到顾客手中的钱能有多少呢 今天我要介绍的就是一种方式就是淘宝官方的省钱工具 淘宝客 大家如果没有淘宝客帐号 可以去阿里妈妈官网上申请帐号 http www alimama co

    2025年10月21日
    8
  • 操作系统复习题最全(复习看它就够了!!!!!

    操作系统复习题最全(复习看它就够了!!!!!PTA 习题总结 非常主观的针对了当时我个人的情况 习题一

    2026年3月18日
    2
  • 微信小程序 小程序源码包括后台完整版分享

    微信小程序 小程序源码包括后台完整版分享需要的留邮箱免费发!版权归作者所有,任何形式转载请联系作者。作者:执波仔丶(csdn博客)最新收集的60个微信小程序源码分享+开发视频教程最新收集的60个微信小程序源码分享+开发视频教程最新收集的60个微信小程序源码分享+开发视频教程小程序源码使用教程(源码文件夹自带安装教程,本教程不一定通用):第一步:百度搜索下载微信开发者工具(最新1.02.1801081,支持mac)第二步:打开开发工具客…

    2022年7月20日
    25
  • 05_MyBatis动态SQL学习笔记

    05_MyBatis动态SQL学习笔记

    2021年7月10日
    101
  • ffmpeg hevc_ffmpeg视频解码

    ffmpeg hevc_ffmpeg视频解码本次目标:1)将容器中的音频码流和视频码流分离出来。2)针对mp4文件中的码流情况进行修复。解封装的基本过程:#include<stdio.h>#include”libavcodec/avcodec.h”#include”libavformat/avformat.h”//MPEG-TS文件解封装得到的码流可播放,MP4解封装得到的码流不可播放;//这与容器的封装方式有关。voiddemuxer(constchar*url){//初始化格式上下文

    2022年10月16日
    4
  • vue 实现父子组件传值和子父组件传值

    vue 实现父子组件传值和子父组件传值先上一张图,vue父子组件传值都用的图片。从张图入手了解如何传参。一、父组件1.引入子组件importrandomfrom”./child-random-paper”;2.注册子组件components:{random,},3.静态组件,循环体<liv-for=”(item,index)inselectedTypeQuestion”:key=”item.id”:ind…

    2022年6月6日
    55

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号