Spring Boot整合RabbitMQ详细教程

Spring Boot整合RabbitMQ详细教程1.首先我们简单了解一下消息中间件的应用场景异步处理场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式(1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.(2)并行方式:将注册信…

大家好,又见面了,我是你们的朋友全栈君。

1.首先我们简单了解一下消息中间件的应用场景

 

异步处理

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式 
(1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西. 
这里写图片描述
(2)并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。 
这里写图片描述 
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回. 
(3)消息队列 
引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理 
这里写图片描述 
由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

 应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口. 
这里写图片描述 
这种做法有一个缺点:

  • 当库存系统出现故障时,订单就会失败。
  • 订单系统和库存系统高耦合. 
    引入消息队列 
    这里写图片描述

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

  • 库存系统:订阅下单的消息,获取下单消息,进行库操作。 
    就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

流量削峰

流量削峰一般在秒杀活动中应用广泛 
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 
作用: 
1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^) 
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单) 
这里写图片描述 
1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面. 

2.秒杀业务根据消息队列中的请求信息,再做后续处理.

以上内容的来源是:https://blog.csdn.net/whoamiyang/article/details/54954780,在此感谢

2.各种消息中间件性能的比较:

TPS比较 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。

持久化消息比较—zeroMq不支持,activeMq和rabbitMq都支持。持久化消息主要是指:MQ down或者MQ所在的服务器down了,消息不会丢失的机制。

可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统、社区—RabbitMq最好,ActiveMq次之,ZeroMq最差。

高并发—从实现语言来看,RabbitMQ最高,原因是它的实现语言是天生具备高并发高可用的erlang语言。

综上所述:RabbitMQ的性能相对来说更好更全面,是消息中间件的首选。

3.接下来我们在springboot当中整合使用RabbitMQ

第一步:导入maven依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
	<version>1.5.2.RELEASE</version>
</dependency>

第二步:在application.properties文件当中引入RabbitMQ基本的配置信息

#对于rabbitMQ的支持
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

第三步:编写RabbitConfig类,类里面设置很多个EXCHANGE,QUEUE,ROUTINGKEY,是为了接下来的不同使用场景。

/**
Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, 
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 
Queue:消息的载体,每个消息都会被投到一个或多个队列。 
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来. 
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 
vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。 
Producer:消息生产者,就是投递消息的程序. 
Consumer:消息消费者,就是接受消息的程序. 
Channel:消息通道,在客户端的每个连接里,可建立多个channel.
*/
@Configuration
public class RabbitConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;


    public static final String EXCHANGE_A = "my-mq-exchange_A";
    public static final String EXCHANGE_B = "my-mq-exchange_B";
    public static final String EXCHANGE_C = "my-mq-exchange_C";


    public static final String QUEUE_A = "QUEUE_A";
    public static final String QUEUE_B = "QUEUE_B";
    public static final String QUEUE_C = "QUEUE_C";

    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
    public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
}

第四步:编写消息的生产者

@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    //由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
    private RabbitTemplate rabbitTemplate;
    /**
     * 构造方法注入rabbitTemplate
     */
    @Autowired
    public MsgProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
    }

    public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
    }
    /**
     * 回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info(" 回调id:" + correlationData);
        if (ack) {
            logger.info("消息成功消费");
        } else {
            logger.info("消息消费失败:" + cause);
        }
    }
}

第五步:把交换机,队列,通过路由关键字进行绑定,写在RabbitConfig类当中

 /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     HeadersExchange :通过添加属性key-value匹配
     DirectExchange:按照routingkey分发到指定队列
     TopicExchange:多关键字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_A);
    }
 /**
     * 获取队列A
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //队列持久
    }
    @Bean
    public Binding binding() {

        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }
    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
    }

第六步:编写消息的消费者,这一步也是最复杂的,因为可以编写出很多不同的需求出来,写法也有很多的不同。

    比如一个生产者,一个消费者

@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    public void process(String content) {
        logger.info("接收处理队列A当中的消息: " + content);
    }

}

Spring Boot整合RabbitMQ详细教程

比如一个生产者,多个消费者,可以写多个消费者,并且他们的分发是负载均衡的。

@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiverC_one {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    public void process(String content) {
            logger.info("处理器one接收处理队列A当中的消息: " + content);
    }
}
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiverC_two {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @RabbitHandler
    public void process(String content) {    
            logger.info("处理器two接收处理队列A当中的消息: " + content);
    }

}

Spring Boot整合RabbitMQ详细教程

另外一种消息处理机制的写法如下,在RabbitMQConfig类里面增加bean:

    @Bean
    public SimpleMessageListenerContainer messageContainer() {
        //加载处理消息A的队列
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        //设置接收多个队列里面的消息,这里设置接收队列A
        //假如想一个消费者处理多个队列里面的信息可以如下设置:
        //container.setQueues(queueA(),queueB(),queueC());
        container.setQueues(queueA());
        container.setExposeListenerChannel(true);
        //设置最大的并发的消费者数量
        container.setMaxConcurrentConsumers(10);
        //最小的并发消费者的数量
        container.setConcurrentConsumers(1);
        //设置确认模式手工确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                /**通过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,
                 换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它 */
                channel.basicQos(1);
                byte[] body = message.getBody();
                logger.info("接收处理队列A当中的消息:" + new String(body));
                /**为了保证永远不会丢失消息,RabbitMQ支持消息应答机制。
                 当消费者接收到消息并完成任务后会往RabbitMQ服务器发送一条确认的命令,然后RabbitMQ才会将消息删除。*/
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        });
        return container;
    }

下面是当一个消费者,处理多个队列里面的信息打印的log

Spring Boot整合RabbitMQ详细教程

 

Fanout Exchange

Fanout 就是我们熟悉的广播模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

    //配置fanout_exchange
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
    }

    //把所有的队列都绑定到这个交换机上去
    @Bean
    Binding bindingExchangeA(Queue queueA,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueA).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueB).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueC).to(fanoutExchange);
    }

消息发送,这里不设置routing_key,因为设置了也无效,发送端的routing_key写任何字符都会被忽略。

public void sendAll(String content) {
        rabbitTemplate.convertAndSend("fanoutExchange","", content);
    }

消息处理的结果如下所示:

Spring Boot整合RabbitMQ详细教程

 

 

 

 

 

 

 

 

 

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/146318.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 手机里实现图片文字识别的实用方法[通俗易懂]

    手机里实现图片文字识别的实用方法[通俗易懂]突然接到老板给的一个任务——把一篇文章排版出来,你会怎样做?是一个字一个字手动手动输入呢?还是语音识别呢?当然,这两种方法都可行,但是不够简单方便。手动输入太慢,语音识别又有点麻烦,如果普通话不好,识别很可能会出错。那什么方法,实现图片文字识别最简单,最方便,还很精确呢?今天就来和大家分享一下,手机里的逆天黑科技,实现图片文字识别,只需5秒钟!方法一:打开QQ,左上角【扫一扫】——手机…

    2022年6月5日
    37
  • rfid-rc522使用教程_RFID读写方式是什么

    rfid-rc522使用教程_RFID读写方式是什么文章目录1、RC522驱动原理2、手机APP查看卡信息3、驱动移植4、读写卡1、RC522驱动原理2、手机APP查看卡信息这里我参考的这篇博主的文章,讲的还是蛮详细的:https://blog.csdn.net/Ikaros_521/article/details/115958888使用的APP如下所示:可以按照软件操作读取一个卡的信息如下所示,可以看到他是16个扇区,每个扇区有四个块,所以就一共是64块,储存的信息数量是1024byte,计算方式为16416=1024。下面的几页信息如

    2022年9月19日
    1
  • platform device 与 platform driver

    platform device 与 platform driver做Linux方面也有三个多月了,对代码中的有些结构一直不是很明白,比如platform_device与platform_driver一直分不清关系。在网上搜了下,做个总结。两者的工作顺序是先定义platform_device->注册platform_device->,再定义

    2022年7月24日
    7
  • ICMP报文详解

    ICMP报文详解概述ICMP允许主机或路由报告差错情况和提供有关异常情况。ICMP是因特网的标准协议,但ICMP不是高层协议,而是IP层的协议。通常ICMP报文被IP层或更高层协议(TCP或UDP)使用。一些ICMP报文把差错报文返回给用户进程。ICMP报文作为IP层数据报的数据,加上数据报的首部,组成数据报发送出去。ICMP报文的种类有两种,即ICMP差错报告报文和ICMP询问报文。ICMP报文的格式类型:占8位代码:占8位检验和:占16位说明:ICMP所有报文的前4个.

    2022年6月11日
    511
  • 伪静态规则写法RewriteRule-htaccess详细语法使用

    伪静态规则写法RewriteRule-htaccess详细语法使用伪静态实际上是利用PHP把当前地址解析成另一种方法来访问网站,要学伪静态规则的写法,要懂一点正则一、正则表达式教程有一个经典的教程:正则表达式30分钟入门教程常用正则如下:.换行符以外的所有字符\w

    2022年7月4日
    18
  • js 判断对象是否为空

    js 判断对象是否为空js判断对象是否为空的四种方法

    2022年5月30日
    38

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号