RabbitMQ
创建一个生产者:
第二步:创建通信通道,相当于TCP的虚拟连接。之所以有这一步,是因为物理连接的开启成本开销很大,当有多个任务同时进行的时候就非常慢。所以这里就有了通道的概念,相当于在当前物理连接中开辟多个虚拟的通信管道,通过通道进行传输。
Channel channel = conn.createChannel(); channel.queueDeclare(“helloworld”, false, false, false, null);
创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
第三步:发送数据 BasecPublish是进行数据的发送,他也有四个参数。
String message = “nihaoa”; //要发送的消息 channel.basicPublish("" ,” helloworld” ,null , message.getBytes());
创建一个消费者:
第三步:创建一个消息消费者,并签收。
channel.basicConsume(“helloworld”, false, new Reciver(channel));
class Reciver extends DefaultConsumer{ private Channel channel; //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到 public Reciver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String messageBody = new String(body); //传入的Body就是从通道中收到的字节数组 System.out.println("消费者接收到:" + messageBody); //签收消息,确认消息 //envelope.getDeliveryTag() 获取这个消息的TagId,也就是这个通道的ID //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息 channel.basicAck(envelope.getDeliveryTag() , false); } }
在消费者这端我们不要close通道,因为消费者会一直循环监听消费。
⑦RabbitMQ六种工作模式
- Hello world
也就是点对点模式,一个生产者对应一个消费者,生产一个消费一个。
- 工作队列
一个生产者生产消息放入工作队列,由多个消费者进行处理。期间可能会按照权重、工作时间等把消息给不同的消费者消费。一般适合于集群。

- 发布订阅模式
中间有有一个交换器,他的作用是将我们的数据按照一定的规则分发给多个消费者,他和工作队列的区别是他会把数据产生多个副本发给不同的消费者。例如下面:c1和c2是完全相同的数据。适合于视频网站,当一个发布者发布消息后,所有的消费者接受到的都是相同的消息进行消费。

- 路由器模式
和发布订阅模式相比,消费者不再是收到相同的数据,而是数据通过交换机有选择的把数据消息分给不同的消费者。他有个缺点,就是需要对数据进行精准匹配,面对一些模糊查询就没辙了。

- 主题模式
解决了路由器模式的缺点,定义一个表达式规则,让消息根据表达式规则发送给不同的消费者。

- RPC模式
远程过程调用。不是MQ主要负责的东西。
生产者:发送200个对象给MQ,把对象转成json字符串发送给MQ。
public class OrderSystem { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); for(int i = 100 ; i <= 200 ; i++) { SMS sms = new SMS("乘客" + i, "" + i, "您的车票已预订成功"); String jsonSMS = new Gson().toJson(sms); channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes()); } System.out.println("发送数据成功"); channel.close(); connection.close(); } }

对象转字符串和字符串转对象,就要用到谷歌的gson这个工具包。我们把生产者生产的对象通过gson转成json格式的字符串发布出去。
消费者(3个,这里就写一个):以前我们是实现一个继承了DefaultConsumer类的类作为参数传入,现在我们直接用DefaultConsumer当内部类来使用。
public class SMSSender1 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender1-短信发送成功:" + jsonSMS); Thread.sleep(10); //本来有try\catch 用这个模拟不同消费者的快慢 channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
如果不写basicQos,则自动MQ会将所有请求平均发送给所有消费者,我们消费者也会有处理速度快慢的说法,为了不让处理快的消费者一直等待,就要加这个。basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的,如果是basicQos(10)则就是处理完10个再从消息队列里面拿。
第一步:在RabbitMQ的可视化管理控制中新建一个Exchanges。选定交换机绑定的虚拟主机,设置名字weather,类型为fanout(针对于发布订阅的)等等,包括持久化选项等等。
第二步:创建生产者。这里有个区别了,就是在发布订阅模式中,我们basicPublish方法第一个参数之前是空,那是因为没有交换机,现在有交换机了这里就要写交换机的名字。其次第二个参数本来是队列的名称,因为这里我们生产者不是直接和队列进行交互,是和交换机进行交互的,所以这里为空即可。
public class WeatherBureau { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); String input = new Scanner(System.in).next(); //手动输入天气情况 Channel channel = connection.createChannel(); channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes()); channel.close(); connection.close(); } }
RabbitConstant.EXCHANGE_WEATHER是之前定义好的常量,即交换机名字weather。所以此时,在生产者这边,就没有创建队列了,直接把生产者和交换机进行绑定。
第三步:创建消费者(百度和新浪,这里只给了百度的例子,新浪是一样的),在消费者中让队列和交换机绑定。
public class Baidu { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, ""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
生产者:
这里代码我就写有区别的地方,这里路由模式下,生产者方也不用创建队列,只需要和交换机绑定,但是绑定代码basicPublish有区别:
channel.basicPublish(“weather“,me.getKey() , null , me.getValue().getBytes());
这里的第二个参数不再是null了,而是me.getKey(),其中me是之前定义的一个迭代器,相当于一个map,这里的第二个参数就是数据筛选的条件。
消费者:
区别点在绑定的queueBing里的第三个参数,是路由的key。这里强调一下,就是在生产者这边创建的队列,可以绑定到多个交换机上,同理一个交换机也可以绑定多个队列。
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.");
第一步:还是在可视化控制台中创建topic模式下的交换机。交换机名字设为topic
对于解绑,我们可以在可视化控制台中进行解绑,也可以调用queueUnbind()方法。
(4)配置镜像集群 在m2服务器上执行命令将与m1服务器进行复制。(也就是把m2加入到m1集群中)
CMQ
①CMQ队列消费模式代码
# 从腾讯云官网查看云api的密钥信息 secretId = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' secretKey = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 使用地域的消息服务 endpoint = 'xxxxxxxxxxxxxxxxxxxxxxxxxxx' # Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象 my_account = Account(endpoint, secretId, secretKey, debug=True) my_account.set_log_level(logging.DEBUG) queue_name = "GZ-QualPlat-ThresholdsEvent-Queue" my_queue = my_account.get_queue(queue_name) recv_msg = my_queue.receive_message() print "Receive Message Succeed! MessageBody:%s" % (recv_msg.msgBody) #register(recv_msg.msgBody) 注册到数据库消费 my_queue.delete_message(recv_msg.receiptHandle)
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/203038.html原文链接:https://javaforall.net
