RabbitMQ入门:工作队列(Work Queue)

在上一篇博客《RabbitMQ入门:HelloRabbitMQ代码实例》中,我们通过指定的队列发送和接收消息,代码还算是比较简单的。假设有这一些比较耗时的任务,按照上一次的那种方式,我们要一直等

大家好,又见面了,我是全栈君。

 

在上一篇博客《RabbitMQ入门:Hello RabbitMQ 代码实例》中,我们通过指定的队列发送和接收消息,代码还算是比较简单的。

假设有这一些比较耗时的任务,按照上一次的那种方式,我们要一直等前面的耗时任务完成了之后才能接着处理后面耗时的任务,那要等多久才能处理完?别担心,我们今天的主角–工作队列就可以解决该问题。我们将围绕下面这个索引展开:

  1. 什么是工作队列
  2. 代码准备
  3. 循环分发
  4. 消息确认
  5. 公平分发
  6. 消息持久化

废话少说,直接展开。

一、什么是工作队列

工作队列–用来将耗时的任务分发给多个消费者(工作者),主要解决这样的问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。

二、代码准备

  1. 生产者类:NewTask.java
    public class NewTask {
        //队列名称
        public static final String QUEUE_NAME = "TASK_QUEUE";
        //队列是否需要持久化
        public static final boolean DURABLE = false;
        
        //需要发送的消息列表
        public static final String[] msgs = {"task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};
        
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
    
            try {
                // 1.connection & channel
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.queue
                channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null);
    
                // 3.publish msg
                for (int i = 0; i < msgs.length; i++) {
                    channel.basicPublish("", QUEUE_NAME, null, msgs[i].getBytes());
                    System.out.println("** new task ****:" + msgs[i]);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
            }
    
        }
    }

     

  2. 消费者类:Work.java
    public class Work {
    
        public static void main(String[] args) {
            System.out.println("*** Work ***");
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try {
                //1.connection & channel
                final Channel channel = factory.newConnection().createChannel();
                
                //2.queue
                channel.queueDeclare(NewTask.QUEUE_NAME, NewTask.DURABLE, false, false, null);
    
                //3. consumer instance
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        //deal task
                        doWork(msg);
    
                    }
                };
                
                //4.do consumer
                boolean autoAck = true;
                channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
        private static void doWork(String msg) {
            try {
                System.out.println("**** deal task begin :" + msg);
                
                //假装task比较耗时,通过sleep()来模拟需要消耗的时间
                if ("sleep".equals(msg)) {
                    Thread.sleep(1000 * 60);
                } else {
                    Thread.sleep(1000);
                }
    
                System.out.println("**** deal task finish :" + msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }

     

  3. 再来一个消费者类:Work2.java,代码同Work.java一模一样。

 

三、循环分发

我们先启动Work和Work2,然后启动NewTask,运行结果如下:

NewTask运行结果:

RabbitMQ入门:工作队列(Work Queue)

Work运行结果:

RabbitMQ入门:工作队列(Work Queue)

Work2运行结果:

 RabbitMQ入门:工作队列(Work Queue)

我们发现,消息生产者发送了6条消息,消费者work和work2分别分到了3个消息,而且是循环轮流分发到的,这种分发的方式就是循环分发。

四、消息确认

假如我们在发送的消息里面添加“sleep”

//需要发送的消息列表
    public static final String[] msgs = {"sleep", "task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};

根据代码中的实现,这个sleep要耗时1分钟,万一在这1分钟之内,工作进程崩溃了或者被kill了,会发生什么情况呢?根据上面的代码:

//4.do consumer
            boolean autoAck = true;
            channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);

自动确认为true,每次RabbitMQ向消费者发送消息之后,会自动发确认消息(我工作你放心,不会有问题),这个时候消息会立即从内存中删除。如果工作者挂了,那将会丢失它正在处理和未处理的所有工作,而且这些工作还不能再交由其他工作者处理,这种丢失属于客户端丢失。

我们来验证下,和刚才的步骤一样执行程序:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep

3.Work2的控制台打印结果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 5
**** deal task finish :task 5

根据上面的内容,消息生产者发送了7条消息, work2消费了1、3、5 三条,那剩下的sleep、2、4、6 这四条消息肯定是work来处理,只是sleep耗时一分钟 ,时间差后面的还没来得及处理,这个时候我们kill掉work,去看下RabbitMQ 管理页面,没有未处理的消息,消息随着work被kill也跟着丢失了。

RabbitMQ入门:工作队列(Work Queue)

是不是很可怕?

为了应对这种情况,RabbitMQ支持消息确认。消费者处理完消息之后,会发送一个确认消息告诉RabbitMQ,消息处理完了,你可以删掉它了。

代码修改(Work.java和Work2.java同步修改):1.将自动确认改为false,2.消息处理之后再通过channel.basicAck进行消息确认

RabbitMQ入门:工作队列(Work Queue)

 修改完后,执行程序:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep

3.Work2的控制台打印结果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 5
**** deal task finish :task 5

然后kill掉work,去看RabbitMQ管理页面,会发现有4条未确认:

RabbitMQ入门:工作队列(Work Queue)

再去看下work2的控制台,work2将work未处理完和未来得及处理的消息都给处理了:

RabbitMQ入门:工作队列(Work Queue)

等work2处理完后,你再去看RabbitMQ管理页面,会发现页面的消息数值也都变成0 了。

 

五、公平分发

按照上面那种循环分发的方式,每个消费者会分到相同数量的任务,这样会有一个问题:假如有一些task非常耗时,之前的任务还没有完成,后面又来了那么多任务,来不及处理,那咋办? 有的消费者忙的不可开交,有的消费者却很快处理完事情然后无所事事浪费资源,那咋整?答案就是:公平分发。 怎么实现呢?

 发生上述问题的原因就是RabbitMQ收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量。因此我们可以使用basicQos方法,并将参数prefetchCount设为1,告诉RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样RabbitMQ就不会轮流分发了,而是寻找空闲的工作者进行分发。

代码修改(work和Work2同步修改):

RabbitMQ入门:工作队列(Work Queue)

执行代码:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep
**** deal task finish :sleep

3.Work2的控制台打印结果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 2
**** deal task finish :task 2
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 4
**** deal task finish :task 4
**** deal task begin :task 5
**** deal task finish :task 5
**** deal task begin :task 6
**** deal task finish :task 6

Work只处理了sleep,Work2处理了1、2、3、4、5、6 这个六条消息。

六、消息持久化

上面说到消息确认的时候,提到了工作者被kill的情况。那如果RabbitMQ被stop掉了呢?我们来看下:

这次只启动Work和NewTask,不启动Work2,所有消息都交给Work来处理,控制台打印信息:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep

在work处理sleep的过程中,我们停掉RabbitMQ服务

RabbitMQ入门:工作队列(Work Queue)

然后重新start服务并执行rabbitmq-plugins enable rabbitmq_management命令,然后查看管理页面:

RabbitMQ入门:工作队列(Work Queue)

你会发现,所有消息都将被清空了。这种丢失属于服务端丢失

因此需要将消息进行持久化来应对这种情况。

持久化需要做两件事情:

  1. 队列持久化,在声明队列的时候,将第二个参数设为trueRabbitMQ入门:工作队列(Work Queue)

          RabbitMQ入门:工作队列(Work Queue)

     另外,由于RabbitMQ不允许重新定义已经存在的队列,否则就会报错(上一篇博客中已经提到过了),因此我们将这次的队列名改下:RabbitMQ入门:工作队列(Work Queue)

     

  2. 消息持久化,在发送消息的时候,将第三个参数设为2RabbitMQ入门:工作队列(Work Queue)

然后运行代码,在work处理sleep的时候将服务停掉,并重新启动且执行rabbitmq-plugins enable rabbitmq_management命令,然后查看管理页面:

 RabbitMQ入门:工作队列(Work Queue)

一共7条消息,未确认的1条(sleep)和ready的6条(1、2、3、4、5、6)。消息被保存了下来。

 重新启动Work,所有消息被消费:

RabbitMQ入门:工作队列(Work Queue)

RabbitMQ入门:工作队列(Work Queue)

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/120898.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • abp架构详解_大数定律通俗理解

    abp架构详解_大数定律通俗理解网上有不少文章说ABP的模块,有的直接翻译自官网介绍,有的分析Modlue的源代码,有的写一通代码,没什么注释,很少有能通俗说清的。那么,有两个问题:1.ABP中的模块到底是什么?2.搞这个东西是干嘛

    2022年8月17日
    14
  • PHP中文字符串的查找与替换「建议收藏」

    PHP中文字符串的查找与替换「建议收藏」查找字符串中是否包含某个词组&amp;amp;amp;lt;?phpechostrpos(&amp;amp;quot;一二三四五&amp;amp;quot;,&amp;amp;quot;一&amp;amp;quot;);echo&amp;amp;quot;&amp;amp;amp;lt;br&amp;amp;amp;gt;&amp;amp;quot;;echostrpos(&amp;amp;quot;一二三

    2022年5月23日
    40
  • python中if的用法例子_python中while的意思是

    python中if的用法例子_python中while的意思是一、if判断语句if语句是用来进行判断的,其使用格式如下:if要判断的条件:条件成立时,要做的事情else这里写代码片否者要做的事情二、框图三、参考代码:chePiao=1ifchePiao==1:print("hell")else:print("hell…

    2022年9月26日
    1
  • 超详细VSCode安装教程(Windows)「建议收藏」

    本文基于Windows系统博主的VSCode专栏:分享使用VSCode的基本操作与各种技巧

    2022年4月4日
    149
  • 报错:“来自数据源的String类型的给定值不能转换为指定目标列的类型nvarchar。”「建议收藏」

    报错:“来自数据源的String类型的给定值不能转换为指定目标列的类型nvarchar。”「建议收藏」解决sqlserver批量插入时出现“来自数据源的String类型的给定值不能转换为指定目标列的类型nvarchar。”问题问题的原因:源的一个字段值长度超过了目标数据库字段的最大长度解决方法:扩大目标数据库对应字段的长度一般原因是源的字段会用空字符串填充,导致字符串长度很大,可以使用rtrim去除…

    2022年7月20日
    12
  • 使用cookie登录

    前言:什么是cookie?Cookie,指某些网站为了辨别用户身份、进行session跟踪而储存在用户本地终端上的数据(通常经过加密)。比如说有些网站需要登录后才能访问某个页面,在登录之前,你想抓取某个页面内容是不允许的。那么我们可以利用Urllib库保存我们登录的Cookie,然后再抓取其他页面,这样就达到了我们的目的。一、Urllib库简介Urllib是python内置的H…

    2022年4月7日
    179

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号