线程池与mq的简单结合使用

线程池与mq的简单结合使用线程池与mq的简单结合使用

大家好,又见面了,我是你们的朋友全栈君。本文考虑发送方和接收方有多个线程发布消息和多个线程接收消息的情况:
 

1.生产者

package com.activemq3;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者-消息发布者(多线程发送)
 * 
 */
public class JMSProducerThread {

    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    private static final int SENDNUM=10; // 发送的消息数量
    ConnectionFactory connectionFactory=null; // 连接工厂
    private Connection connection = null;
    private Session session = null;
    private Destination destination=null; // 消息的目的地
    public void init(){
        // 实例化连接工厂
        connectionFactory=new ActiveMQConnectionFactory(JMSProducerThread.USERNAME, JMSProducerThread.PASSWORD, JMSProducerThread.BROKEURL);
        try {
            connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    public void produce(){
        try {
            MessageProducer messageProducer; // 消息生产者
            session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            destination=session.createQueue("queue1");
            messageProducer=session.createProducer(destination); // 创建消息生产者
            for(int i=0;i<JMSProducerThread.SENDNUM;i++){
                TextMessage message=session.createTextMessage("ActiveMQ中"+Thread.currentThread().getName()+"线程发送的数据"+":"+i);
                System.out.println(Thread.currentThread().getName()+"线程"+"发送消息:"+"ActiveMQ 发布的消息"+":"+i);
                messageProducer.send(message);
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    /**
     * 发送消息
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
        for(int i=0;i<JMSProducerThread.SENDNUM;i++){
            TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
            System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i);
            messageProducer.send(message);
        }
    }

}

2.消费者

package com.activemq3;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者-消息发布者(多线程发送消息)
 *
 */
public class JMSConsumerThread {

    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址

    ConnectionFactory connectionFactory=null; // 连接工厂
    private Connection connection = null;
    private Session session = null;
    private Destination destination=null; // 消息的目的地
    public void init(){
        // 实例化连接工厂
        connectionFactory=new ActiveMQConnectionFactory(JMSConsumerThread.USERNAME, JMSConsumerThread.PASSWORD, JMSConsumerThread.BROKEURL);
        try {
            connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    public void consumer(){
        MessageConsumer messageConsumer; // 消息的消费者

        try {
            session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            destination=session.createQueue("queue1");
            messageConsumer=session.createConsumer(destination); // 创建消息消费者
            messageConsumer.setMessageListener(new Listener3()); // 注册消息监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


}

package com.activemq3;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听(多线程接收)
 *
 */
public class Listener3 implements MessageListener{
    private ExecutorService threadPool =Executors.newFixedThreadPool(8);
    @Override
    public void onMessage(final Message message) {
    threadPool.execute(new Runnable() {

            @Override
            public void run() {
                try {
                    try {
                        Thread.sleep(new Random().nextInt(2)*500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("接收线程..."+Thread.currentThread().getName()+"收到的消息是:"+((TextMessage)message).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }               
            }
        }); 
    }
}

生产者执行发送的主函数:
 

在main方法中创建了一个固定3个 线程的线程池,去处理5个线程任务。

package com.activemq3;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * 多线程发送消息
 */

public class MainProducer {
          public static void main(String[] args) {

              ExecutorService threadPool=Executors.newFixedThreadPool(3);
              for (int i = 0; i < 5; i++) {
                  threadPool.submit(new Runnable() {

                    @Override
                    public void run() {
                        JMSProducerThread jph=new JMSProducerThread();
                        try {
                            Thread.sleep(new Random().nextInt(5)*500);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        jph.init();
                        jph.produce();                      
                    }
                });

            }
             /* 
            for (int i = 0; i <5; i++) {

               new Thread(new Runnable() {

                @Override
                public void run() {
                    JMSProducerThread js=new JMSProducerThread();
                    try {
                        Thread.sleep(new Random().nextInt(5)*500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    js.init();
                    js.produce();

                }
            }).start();

            }*/         
            }
}

消费者执行发送的主函数:
 

消费者中监听器中创建了固定8个线程的线程池去接收消息。

package com.activemq3;
public class MainConsumer {
          public static void main(String[] args) {
            JMSConsumerThread jch=new JMSConsumerThread();
            jch.init();
            jch.consumer();             
  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/106177.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 装水的容器大全_盛水的容器什么最好

    装水的容器大全_盛水的容器什么最好原题链接给你 n 个非负整数 a1,a2,…,an,每个数代表坐标中的一个点 (i, ai) 。在坐标内画 n 条垂直线,垂直线 i 的两个端点分别为 (i, ai) 和 (i, 0) 。找出其中的两条线,使得它们与 x 轴共同构成的容器可以容纳最多的水。说明:你不能倾斜容器。示例 1:输入:[1,8,6,2,5,4,8,3,7]输出:49解释:图中垂直线代表输入数组 [1,8,6,2,5,4,8,3,7]。在此情况下,容器能够容纳水(表示为蓝色部分)的最大值为 49。示例 2:输入:he

    2022年8月8日
    1
  • 过压保护(2)_过压保护值和欠压保护值

    过压保护(2)_过压保护值和欠压保护值http://www.elecfans.com/dianlutu/protect/2009102499242.html采用CW136构成的过压保护电路图中是采用CW136构成的过压保护电路,当电子设备

    2022年8月2日
    8
  • ffmpeg安装教程win10_nginx菜鸟教程

    ffmpeg安装教程win10_nginx菜鸟教程简述作为一个计算机方面的小白,对ffmpeg其实没多少了解,只是因为在合并音频和视频要使用到ffmpeg这个工具,所以才下载下来,所以就是一个简单的安装教程。话不多说开始安装吧。下载百度网盘可能有兄弟访问github不是很给力,直接下载这个也是可以的链接:https://pan.baidu.com/s/1Z7VkOv-_PAub6OfDkyly4Q提取码:yj5e官网下载来到官网下载点击跳转来到下载主页点击这个进入github,找到资源下载即可下载这个也可以,我下载的时候出现了很

    2022年9月13日
    3
  • python pyc文件解析_pyc文件

    python pyc文件解析_pyc文件codeobject¶在我们导入python脚本时在目录下会生成个一个相应的pyc文件,是pythoncodeobj的持久化储存形式,加速下一次的装载。文件结构¶pyc文件由三大部分组成最开始4个字节是一个Maigcint,标识此pyc的版本信息接下来四个字节还是个int,是pyc产生的时间序列化的PyCodeObject,结构参照include/code.h,序列化方法pyth…

    2022年6月29日
    39
  • 分布式锁的实现和应用场景_predis分布式锁的应用

    分布式锁的实现和应用场景_predis分布式锁的应用文章目录如何理解分布式锁分布式锁的常用实现基于关系型数据库存在单点故障风险不可重入无法实现阻塞应用Redis缓存基于ZooKeeper实现电商网站都会遇到秒杀、特价之类的活动,大促活动有一个共同特点就是访问量激增,在高并发下会出现成千上万人抢购一个商品的场景。虽然在系统设计时会通过限流、异步、排队等方式优化,但整体的并发还是平时的数倍以上,参加活动的商品一般都是限量库存,如何防止库存超卖,避免并发问题呢?分布式锁就是一个解决方案。如何理解分布式锁我们都知道,在业务开发中,为了保证在多线程下处理

    2022年9月7日
    2
  • pytest-allure_什么是思想报告

    pytest-allure_什么是思想报告前言allure是一个report框架,支持java的Junit/testng等框架,当然也可以支持python的pytest框架,也可以集成到Jenkins上展示高大上的报告界面。mac环境:

    2022年7月29日
    6

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号