RocketMQ 入门使用详解[通俗易懂]

RocketMQ 入门使用详解[通俗易懂]RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,已经于2016年11月成为 Apache 孵化项目,相信RocketMQ的未来会发挥着越来越大的作用,将有更多的开发者因此受益。 本文仅对RocketMQ的简单实用做入门性介绍,不对RocketMQ的底层原理进行深入介绍,后续文章将对RocketMQ的原理做详细介绍。

大家好,又见面了,我是你们的朋友全栈君。

       RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,已经于2016年11月成为 Apache 孵化项目,相信RocketMQ的未来会发挥着越来越大的作用,将有更多的开发者因此受益。

        本文仅对RocketMQ的简单实用做入门性介绍,不对RocketMQ的底层原理进行深入介绍,后续文章将对RocketMQ的原理做详细介绍。

        RocketMQ的Maven依赖:

   

<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.2.6</version>
</dependency>

     MQ的消费类RocketMQConsumer.java:

    

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import java.util.UUID;


/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQConsumer {

    private DefaultMQPushConsumer consumer;

    private MessageListener listener;

    protected String nameServer;

    protected String groupName;

    protected String topics;

    public RocketMQConsumer(MessageListener listener, String nameServer, String groupName, String topics) {
        this.listener = listener;
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }

    public void init() {
        consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(nameServer);
        try {
            consumer.subscribe(topics, "*");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        consumer.setInstanceName(UUID.randomUUID().toString());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener((MessageListenerConcurrently) this.listener);

        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        System.out.println("RocketMQConsumer Started! group=" + consumer.getConsumerGroup() + " instance=" + consumer.getInstanceName()
        );
    }


}

       MQ消息的监听接口类RocketMQListener.java

      

        

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQListener  implements MessageListenerConcurrently {


    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//        System.out.println("get data from rocketMQ:" + msgs);
        for (MessageExt message : msgs) {

            String msg = new String(message.getBody());
            System.out.println("msg data from rocketMQ:" + msg);
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

        MQ消息的生产者类RocketMQProducer.java

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;

import java.util.UUID;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQProducer {

    private DefaultMQProducer sender;

    protected String nameServer;

    protected String groupName;

    protected String topics;

    public void init() {
        sender = new DefaultMQProducer(groupName);
        sender.setNamesrvAddr(nameServer);
        sender.setInstanceName(UUID.randomUUID().toString());
        try {
            sender.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public RocketMQProducer(String nameServer, String groupName, String topics) {
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }

    public void send(Message message) {

        message.setTopic(topics);

        try {
            SendResult result = sender.send(message);
            SendStatus status = result.getSendStatus();
            System.out.println("messageId=" + result.getMsgId() + ", status=" + status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

       测试RocketMQ的消费 RocketMQConsumerTest.java

package com.lance.rocketMQ.RocketMQ;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQConsumerTest {


    public static void main(String[] args) {


        String mqNameServer = "172.10.254.2:9876";
        String mqTopics = "MQ-MSG-TOPICS-TEST";

        String consumerMqGroupName = "CONSUMER-MQ-GROUP";
        RocketMQListener mqListener = new RocketMQListener();
        RocketMQConsumer mqConsumer = new RocketMQConsumer(mqListener, mqNameServer, consumerMqGroupName, mqTopics);
        mqConsumer.init();


        try {
            Thread.sleep(1000 * 60L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

          run RocketMQConsumerTest.java 之后,控制台输出:

       

RocketMQConsumer Started! group=CONSUMER-MQ-GROUP instance=1eb7d308-4414-4658-90b5-e2cae3b793eb

            结果分析: 此时MQ对应的TOPIC中并没有响应的消息,故收不到消息,仅看到MQ消费者正常启动信息。

       

        MQ的生产者测试类:RocketMQProducerTest.java

       

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.common.message.Message;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQProducerTest {

    public static void main(String[] args) {

        String mqNameServer = "172.10.254.2:9876";
        String mqTopics = "MQ-MSG-TOPICS-TEST";

        String producerMqGroupName = "PRODUCER-MQ-GROUP";
        RocketMQProducer mqProducer = new RocketMQProducer(mqNameServer, producerMqGroupName, mqTopics);
        mqProducer.init();


        for (int i = 0; i < 5; i++) {

            Message message = new Message();
            message.setBody(("I send message to RocketMQ " + i).getBytes());
            mqProducer.send(message);
        }



    }

}

          run RocketMQProducerTest.java 之后,RocketMQProducerTest.java 对应的控制台输出为:

                

messageId=0A71290100002A9F00000003D0BB0832, status=SEND_OK
messageId=0A71290100002A9F00000003D0BB08BB, status=SEND_OK
messageId=0A71290100002A9F00000003D0BB0944, status=SEND_OK
messageId=0A71290100002A9F00000003D0BB09CD, status=SEND_OK
messageId=0A71290300002A9F000000005440AEED, status=SEND_OK

         结果分析:表明所有消息都已经正常发送,且被RocketMQ正常接收。

          此时查看RocketMQConsumerTest.java对应的控制台输出发生改变,输出内容变更如下:

         

RocketMQConsumer Started! group=CONSUMER-MQ-GROUP instance=1eb7d308-4414-4658-90b5-e2cae3b793eb
msg data from rocketMQ:I send message to RocketMQ 1
msg data from rocketMQ:I send message to RocketMQ 0
msg data from rocketMQ:I send message to RocketMQ 3
msg data from rocketMQ:I send message to RocketMQ 2
msg data from rocketMQ:I send message to RocketMQ 4

看,简单吧!

备注:小编自己使用了Apache版本的RocketMQ(即RocketMQ 4.*),发现只需要更改import的package的路径而已,不需要修改其他代码,请参考。

RocketMQ的重复问题解决方式:
a.MQ的消费端执行的操作具有幂等性,即无论多少次重复执行,其结果是一样的;
b.MQ的消费端做重复校验,比如将受到MQ消息的唯一编号保存到Redis中,即每次收到消息时,将检查唯一编号是否已经在Redis中,如果存在说明消息重复;否则将唯一编号放入到Redis中,可以根据系统需要设置唯一编号在Redis中的过期时间,以防止Redis溢出。

 

参考链接:1.分布式开放消息系统(RocketMQ)的原理与实践 (强烈推荐)

                    2.RocketMQ捐赠给Apache那些鲜为人知的故事

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/149571.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 我国人口老龄化预测_中国人口老龄人口比例

    我国人口老龄化预测_中国人口老龄人口比例1、国内人口增速逐步下滑我国人口总量将近14亿,增速持续降低,城镇化率接近60%。建国以来,随着经济社会发展,我国人口总量从1949年的5.4亿人,增长至2018年13.9亿人。从增速来看,70年代人口总量的增速迅速下滑至1.3%-1.5%左右,90年代以来,人口总量增速持续缓慢下滑,近年保持在0.5%左右,但2018年增速进一步降低至0.4%。另一方面,中国城镇化率不断提升,到2018年已经达到59.58%。2018年我国人口总量为13.9亿人,同比增长0.4%,城镇化率59.58%数据来

    2022年9月5日
    18
  • LOADRUNNER8.1卸载

    LOADRUNNER8.1卸载

    2021年12月1日
    40
  • 多个单列索引和联合索引的区别详解

    多个单列索引和联合索引的区别详解背景:为了提高数据库效率,建索引是家常便饭;那么当查询条件为2个及以上时,我们是创建多个单列索引还是创建一个联合索引好呢?他们之间的区别是什么?哪个效率高呢?我在这里详细测试分析下。一、联合索引测试注:Mysql版本为5.7.20创建测试表(表记录数为63188):CREATETABLE`t_mobilesms_11`(`id`bigint(20)NOT…

    2022年6月4日
    44
  • 4月idea激活码_通用破解码「建议收藏」

    4月idea激活码_通用破解码,https://javaforall.net/100143.html。详细ieda激活码不妨到全栈程序员必看教程网一起来了解一下吧!

    2022年3月16日
    75
  • 关于 sso 博客大巴的神仙的一点思路

    关于 sso 博客大巴的神仙的一点思路
    聊天中的一些片段。。。可以记录参考下
     
    1每个应用独立维护自己的session
     
    2用户中心是一个特殊的应用,负责用户登录验证某个需要登录的应用发现没有登录状态就跳到用户中心用户中心判断有没在自己这里登录过,登录过就返回用户信息,没登陆过就先登录,然后记自己的session别的应用收到以后根据用户信息记自己的session之后只要自己的session还在,就不关用户中心的事了

    2022年7月12日
    13
  • status:searchingServer_physical distancing

    status:searchingServer_physical distancingTitleSpatialInformationGuidedConvolutionforReal-TimeRGBDSemanticSegmentation标题空间信息引导的卷积用于实时RGBD语义分割pdfhttps://arxiv.org/pdf/2004.04534v1.pdf摘要  已知3D空间信息对于语义分割任务是有益的。大多数现有方法都…

    2022年9月2日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号