详解RocketMQ不同类型的消费者

详解RocketMQ不同类型的消费者

根据使用者对读取操作的控制情况,分为两种类型。一个是DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;另一个是DefaultMQPullConsumer,读取操作中的大部分功能由使用者自主控制。
1.DefaultMQPushConsumer的使用
使用DefaultMQPushConsumer主要是设置好各种参数和传入处理消息的函数。系统收到消息后自动调用处理函数来处理消息,自动保存Offset,而且加入新的DefaultMQPushConsumer后会自动做负载均衡。下面结合org.apache.rocketmq.example.quickstart包中的源码来介绍。
代码清单1-1 DefaultMQPushConsumer示例
public class QuickStart {

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

Consumer.setNamesrvAddr(“name-server1-ip:9876;name-server2-ip:9876”);
Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    Consumer.setMessageModel(MessageModel.BROADCASTING);

    Consumer.subscribe("TopicTest", "*");
    Consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,   ConsumeConcurrentlyContext context) {
            System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    Consumer.start();
}

}
DefaultMQPushConsumer需要设置三个参数:一是这个Consumer的GroupName,二是NameServer的地址和端口号,三是Topic的名称,下面详细介绍。
Consumer的GroupName用于把多个Consumer组织到一起,提高并发处理能力,GroupName需要和消息模式(MessageModel)配合使用。
RocketMQ支持两种消息模式:Clustering 和 Broadcasting。
 在Clustering 模式下,同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的。
 在Broadcasting模式下,同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。
NameServer的地址和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的,比如 “ip1:port;ip2:port;ip3:port”。
Topic名称用来标识消息类型,需要提前创建。如果不需要消费某个Topic下的所有消息,可以通过指定消息的Tag进行消息过滤,比如:Consumer.subscribe(“TopicTest”, “tag1 || tag2 || tag3”),表示这个Consumer要消费“TopicTest”下带有tag1或tag2或tag3的消息(Tag是在发送消息时设置的标签)。在填写Tag参数的位置,用null或者“*”表示要消费这个Topic的所有消息。
2.DefaultMQPushConsumer的处理流程
本节通过分析源码来说明DefaultMQPushConsumer的处理流程。
DefaultMQPushConsumer主要功能实现在DefaultMQPushConsumerImpl类中,消息的处理逻辑是在pullMessage这个函数里的PullCallBack中。在PullCallBack函数里有个switch语句,根据从Broker返回的消息类型做相应的处理,具体处理逻辑可以查看源码。
代码清单1-2 DefaultMQPushConsuer的处理逻辑
switch (pullResult.getPullStatus()) {

case FOUND:
    …..
    break;
case NO_NEW_MSG:
    ……
    break;
case OFFSET_ILLEGAL:
    ……
    break;
default:
    break;

}
DefaultMQPushConsuer的源码中有很多PullRequest语句,比如DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest),为什么“PushConsumer”中使用“PullRequest”呢?这是通过“长轮询”方式达到Push效果的方法,长轮询方式既有Pull的优点,又兼具Push方式的实时性。
Push方式是Server端接收到消息后,主动把消息推送给Client端,实时性高。对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端;首先是加大Server端的工作量,进而影响Server的性能,其次Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。
Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。Pull方式的问题是循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;每个Pull的时间间隔太长,Server端有消息到来有可能没有被及时处理。
“长轮询”方式是通过Client端和Server端的配合,既拥有Pull的优点,又能达到保证实时性的目的。我们结合源码来分析:
代码清单1-3 发送Pull消息代码片段
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.ConsumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(Offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);


PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(

brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);

源码中有这一行设置语句requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis),设置Broker最长阻塞时间,默认设置是15秒,注意是Broker在没有新消息的时候才阻塞,有消息会立刻返回。
从Broker的源码中可以看出,服务端接到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次 waitForRunning一段时候(默认是5秒),然后后再Check。默认情况下当Broker一直没有新消息,第三次Check的时候,等待时间超过Request里面的 BrokerSuspendMaxTimeMillis,就返回空结果。在等待的过程中,Broker收到了新的消息后会直接调用notifyMessageArriving函数返回请求结果。“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。
长轮询方式的局限性,是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。
3.DefaultMQPullConsumer
使用DefaultMQPullConsumer像使用DefaultMQPushConsumer一样需要设置各种参数,写处理消息的函数,同时还需要做额外的事情。接下来结合org.apache.rocketmq.example.simple包中的例子源码来介绍。
示例代码的处理逻辑是逐个读取某Topic下所有Message Queue的内容,读完一遍后退出,主要处理额外的三件事情:
(1) 获取Message Queue并遍历
一个Topic包括多个Message Queue,如果这个Consumer需要获取Topic下所有的消息,就要遍历多有的Message Queue。如果有特殊情况,也可以选择某些特定的Message Queue来读取消息。
(2) 维护Offsetstore
从一个Message Queue里拉取消息的时候,要传入Offset参数(long类型的值),随着不断读取消息,Offset会不断增长。这个时候由用户负责把Offset存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等。
(3) 根据不同的消息状态做不同的处理
拉取消息的请求发出后,会返回:FOUND,NO_MATCHED_MSG,NO_NEW_MSG,OFFSET_ILLEGAL四种状态,要根据每个状态做不同的处理。比较重要的两个状态是FOUNT和NO_NEW_MSG,分别表示获取到消息和没有新的消息
实际情况中可以把while(true)放到外层,达到无限循环的目的。因为PullConsumer需要用户自己处理遍历Message Queue、保存Offset,所以PullConsumer有更多的自主性和灵活性。

推荐阅读:
RocketMQ

RocketMQ实战与原理解析
作者:杨开元
定价:59.00元
•RocketMQ由阿里开源,Apache开源项目,经受多年流量峰值考验,在多个性能指标上远超同类产品
•作者是阿里资深数据专家,有多年RocketMQ使用经验,深入研究RocketMQ源代码,写作前与RocketMQ官方团队有深入沟通
•云栖社区官方出品,得到RocketMQ官方研发团队以及业界的多位专家的肯定和推荐

阅读原文:http://product.dangdang.com/25290633.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/101576.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 有极性电容和无极性电容的区别_非极性电容

    有极性电容和无极性电容的区别_非极性电容有极性电容与无极性电容的概述有极性电容与无极性电容的概述有极性电容的识别有极性电容于无极性电容的区别网友见解有极性电容与无极性电容的概述理想的电容,本来是没有极性的。但是在实际中,为了获得大容量,就使用了某些特殊的材料和结构,这就导致了实际的电容有些是有极性的。常见的有极性电容有铝电解电容,钽电解电容等。电解电容一般是容量相对比较大的。如果要做一个大容量的无极性电容,就没那…

    2022年8月22日
    12
  • QuoteName\生成Sql语句

    QuoteName\生成Sql语句ifexists(selectnamefromtempdb.dbo.sysobjectswhereid=object_id(N’tempdb.dbo.#tempWorkDate’)andtype=’u’)droptable#tempWorkDateCREATETABLE[#tempWorkDate]([WorkDate][varc…

    2022年7月25日
    8
  • mysql截取数字_mysql 截取字符串中的数字

    mysql截取数字_mysql 截取字符串中的数字展开全部selectREVERSE(right(REVERSE(filename),length(filename)-LEAST(if(Locate(‘0’,REVERSE(filename))>0,Locate(‘0’,REVERSE(filename)),999),if(Locate(‘1’,REVERSE(filename))>0,Locate(‘1’,REVERSE(f…

    2022年6月3日
    93
  • oracle number类型的数值存储空间是几个字节?

    oracle number类型的数值存储空间是几个字节?其实有公式可以计算:number(p,s)占用得空间为:length=floor((p+1)/2)+1备注:如果该数值为负数,需要再加一个字节。—————-例如:NUMBER(14,4)的类型数值,存储空间为selectfloor((14+1)/2)+1fromdual结果输出为:8

    2022年7月24日
    9
  • protostuff java_protostuff 及其注意事项

    protostuff java_protostuff 及其注意事项google开发的开源的序列化方案protocolbuffer(简称protobuf),它的好处很多,独立于语言,独立于平台,最最重要的是它的效率相当高,用protobuf序列化后的大小是json的10分之一,xml格式的20分之一,是二进制序列化的10分之一。protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用…

    2022年5月2日
    84
  • php拼接循环拼接字符串数组,PHP数组拼接

    php拼接循环拼接字符串数组,PHP数组拼接最近的工作中老是要遇到将两个数组进行拼接的操作。下面总结一下数组拼接的几个函数及它们的不同点。PHP中两个数组合并可以使用+或者array_merge,但之间还是有区别的,而且这些区别如果了解不清楚项目中会要命的!主要区别是两个或者多个数组中如果出现相同键名,键名分为字符串或者数字,需要注意。1)键名为数字时,array_merge()后面的值将不会覆盖原来的值,而是附加到后面,但+合并数组则会把…

    2022年5月30日
    44

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号