RabbitMQ与CMQ的使用与实战

RabbitMQ与CMQ的使用与实战RabbitMQ Rabbitmq 的启动和关闭 rabbitmq server 前台启动服务 rabbitmq server detached 后台启动服务 常用 rabbitmqctls 停止服务端口号是 5672 可视化端口 15672 Linux 中查看正在运行的端口号 netstat tulpn 终止与启动应用 Rabbitmqctls app 启动引用

RabbitMQ

创建一个生产者:

  第二步:创建通信通道,相当于TCP的虚拟连接。之所以有这一步,是因为物理连接的开启成本开销很大,当有多个任务同时进行的时候就非常慢。所以这里就有了通道的概念,相当于在当前物理连接中开辟多个虚拟的通信管道,通过通道进行传输。

 Channel channel = conn.createChannel(); channel.queueDeclare(“helloworld”, false, false, false, null); 

  创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列

  第三步:发送数据 BasecPublish是进行数据的发送,他也有四个参数。

String message = “nihaoa”; //要发送的消息 channel.basicPublish("" ,” helloworld” ,null , message.getBytes()); 
创建一个消费者:

第三步:创建一个消息消费者,并签收。

 channel.basicConsume(“helloworld”, false, new Reciver(channel)); 
 class Reciver extends DefaultConsumer{ private Channel channel; //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到 public Reciver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String messageBody = new String(body); //传入的Body就是从通道中收到的字节数组 System.out.println("消费者接收到:" + messageBody); //签收消息,确认消息 //envelope.getDeliveryTag() 获取这个消息的TagId,也就是这个通道的ID //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息 channel.basicAck(envelope.getDeliveryTag() , false); } } 

  在消费者这端我们不要close通道,因为消费者会一直循环监听消费。

⑦RabbitMQ六种工作模式

  1. Hello world
    也就是点对点模式,一个生产者对应一个消费者,生产一个消费一个。
    在这里插入图片描述




  2. 工作队列
    一个生产者生产消息放入工作队列,由多个消费者进行处理。期间可能会按照权重、工作时间等把消息给不同的消费者消费。一般适合于集群。
    在这里插入图片描述




  3. 发布订阅模式
    中间有有一个交换器,他的作用是将我们的数据按照一定的规则分发给多个消费者,他和工作队列的区别是他会把数据产生多个副本发给不同的消费者。例如下面:c1和c2是完全相同的数据。适合于视频网站,当一个发布者发布消息后,所有的消费者接受到的都是相同的消息进行消费。
    在这里插入图片描述




  4. 路由器模式
    和发布订阅模式相比,消费者不再是收到相同的数据,而是数据通过交换机有选择的把数据消息分给不同的消费者。他有个缺点,就是需要对数据进行精准匹配,面对一些模糊查询就没辙了。
    在这里插入图片描述




  5. 主题模式
    解决了路由器模式的缺点,定义一个表达式规则,让消息根据表达式规则发送给不同的消费者。
    在这里插入图片描述




  6. RPC模式
    远程过程调用。不是MQ主要负责的东西。

  生产者:发送200个对象给MQ,把对象转成json字符串发送给MQ。

 public class OrderSystem { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); for(int i = 100 ; i <= 200 ; i++) { SMS sms = new SMS("乘客" + i, "" + i, "您的车票已预订成功"); String jsonSMS = new Gson().toJson(sms); channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes()); } System.out.println("发送数据成功"); channel.close(); connection.close(); } } 

在这里插入图片描述
  对象转字符串和字符串转对象,就要用到谷歌的gson这个工具包。我们把生产者生产的对象通过gson转成json格式的字符串发布出去。

  消费者(3个,这里就写一个):以前我们是实现一个继承了DefaultConsumer类的类作为参数传入,现在我们直接用DefaultConsumer当内部类来使用。

public class SMSSender1 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender1-短信发送成功:" + jsonSMS); Thread.sleep(10); //本来有try\catch 用这个模拟不同消费者的快慢 channel.basicAck(envelope.getDeliveryTag() , false); } }); } } 

  如果不写basicQos,则自动MQ会将所有请求平均发送给所有消费者,我们消费者也会有处理速度快慢的说法,为了不让处理快的消费者一直等待,就要加这个。basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的,如果是basicQos(10)则就是处理完10个再从消息队列里面拿。

  第一步:在RabbitMQ的可视化管理控制中新建一个Exchanges。选定交换机绑定的虚拟主机,设置名字weather,类型为fanout(针对于发布订阅的)等等,包括持久化选项等等。

  第二步:创建生产者。这里有个区别了,就是在发布订阅模式中,我们basicPublish方法第一个参数之前是空,那是因为没有交换机,现在有交换机了这里就要写交换机的名字。其次第二个参数本来是队列的名称,因为这里我们生产者不是直接和队列进行交互,是和交换机进行交互的,所以这里为空即可。

public class WeatherBureau { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); String input = new Scanner(System.in).next(); //手动输入天气情况 Channel channel = connection.createChannel(); channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes()); channel.close(); connection.close(); } } 

  RabbitConstant.EXCHANGE_WEATHER是之前定义好的常量,即交换机名字weather。所以此时,在生产者这边,就没有创建队列了,直接把生产者和交换机进行绑定。

  第三步:创建消费者(百度和新浪,这里只给了百度的例子,新浪是一样的),在消费者中让队列和交换机绑定。

public class Baidu { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, ""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } } 

  生产者:
  这里代码我就写有区别的地方,这里路由模式下,生产者方也不用创建队列,只需要和交换机绑定,但是绑定代码basicPublish有区别:

channel.basicPublish(“weather“,me.getKey() , null , me.getValue().getBytes()); 

  这里的第二个参数不再是null了,而是me.getKey(),其中me是之前定义的一个迭代器,相当于一个map,这里的第二个参数就是数据筛选的条件。

  消费者:
  区别点在绑定的queueBing里的第三个参数,是路由的key。这里强调一下,就是在生产者这边创建的队列,可以绑定到多个交换机上,同理一个交换机也可以绑定多个队列。

channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao."); 

  第一步:还是在可视化控制台中创建topic模式下的交换机。交换机名字设为topic

  对于解绑,我们可以在可视化控制台中进行解绑,也可以调用queueUnbind()方法。

(4)配置镜像集群 在m2服务器上执行命令将与m1服务器进行复制。(也就是把m2加入到m1集群中)

CMQ

①CMQ队列消费模式代码

# 从腾讯云官网查看云api的密钥信息 secretId = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' secretKey = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 使用地域的消息服务 endpoint = 'xxxxxxxxxxxxxxxxxxxxxxxxxxx' # Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象 my_account = Account(endpoint, secretId, secretKey, debug=True) my_account.set_log_level(logging.DEBUG) queue_name = "GZ-QualPlat-ThresholdsEvent-Queue" my_queue = my_account.get_queue(queue_name) recv_msg = my_queue.receive_message() print "Receive Message Succeed! MessageBody:%s" % (recv_msg.msgBody) #register(recv_msg.msgBody) 注册到数据库消费 my_queue.delete_message(recv_msg.receiptHandle) 
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/203038.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午10:43
下一篇 2026年3月19日 下午10:44


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号