RocketMQ 入门使用详解[通俗易懂]

RocketMQ 入门使用详解[通俗易懂]RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,已经于2016年11月成为 Apache 孵化项目,相信RocketMQ的未来会发挥着越来越大的作用,将有更多的开发者因此受益。 本文仅对RocketMQ的简单实用做入门性介绍,不对RocketMQ的底层原理进行深入介绍,后续文章将对RocketMQ的原理做详细介绍。

大家好,又见面了,我是你们的朋友全栈君。

       RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,已经于2016年11月成为 Apache 孵化项目,相信RocketMQ的未来会发挥着越来越大的作用,将有更多的开发者因此受益。

        本文仅对RocketMQ的简单实用做入门性介绍,不对RocketMQ的底层原理进行深入介绍,后续文章将对RocketMQ的原理做详细介绍。

        RocketMQ的Maven依赖:

   

<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.2.6</version>
</dependency>

     MQ的消费类RocketMQConsumer.java:

    

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import java.util.UUID;


/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQConsumer {

    private DefaultMQPushConsumer consumer;

    private MessageListener listener;

    protected String nameServer;

    protected String groupName;

    protected String topics;

    public RocketMQConsumer(MessageListener listener, String nameServer, String groupName, String topics) {
        this.listener = listener;
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }

    public void init() {
        consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(nameServer);
        try {
            consumer.subscribe(topics, "*");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        consumer.setInstanceName(UUID.randomUUID().toString());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener((MessageListenerConcurrently) this.listener);

        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        System.out.println("RocketMQConsumer Started! group=" + consumer.getConsumerGroup() + " instance=" + consumer.getInstanceName()
        );
    }


}

       MQ消息的监听接口类RocketMQListener.java

      

        

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQListener  implements MessageListenerConcurrently {


    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//        System.out.println("get data from rocketMQ:" + msgs);
        for (MessageExt message : msgs) {

            String msg = new String(message.getBody());
            System.out.println("msg data from rocketMQ:" + msg);
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

        MQ消息的生产者类RocketMQProducer.java

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;

import java.util.UUID;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQProducer {

    private DefaultMQProducer sender;

    protected String nameServer;

    protected String groupName;

    protected String topics;

    public void init() {
        sender = new DefaultMQProducer(groupName);
        sender.setNamesrvAddr(nameServer);
        sender.setInstanceName(UUID.randomUUID().toString());
        try {
            sender.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public RocketMQProducer(String nameServer, String groupName, String topics) {
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }

    public void send(Message message) {

        message.setTopic(topics);

        try {
            SendResult result = sender.send(message);
            SendStatus status = result.getSendStatus();
            System.out.println("messageId=" + result.getMsgId() + ", status=" + status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

       测试RocketMQ的消费 RocketMQConsumerTest.java

package com.lance.rocketMQ.RocketMQ;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQConsumerTest {


    public static void main(String[] args) {


        String mqNameServer = "172.10.254.2:9876";
        String mqTopics = "MQ-MSG-TOPICS-TEST";

        String consumerMqGroupName = "CONSUMER-MQ-GROUP";
        RocketMQListener mqListener = new RocketMQListener();
        RocketMQConsumer mqConsumer = new RocketMQConsumer(mqListener, mqNameServer, consumerMqGroupName, mqTopics);
        mqConsumer.init();


        try {
            Thread.sleep(1000 * 60L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

          run RocketMQConsumerTest.java 之后,控制台输出:

       

RocketMQConsumer Started! group=CONSUMER-MQ-GROUP instance=1eb7d308-4414-4658-90b5-e2cae3b793eb

            结果分析: 此时MQ对应的TOPIC中并没有响应的消息,故收不到消息,仅看到MQ消费者正常启动信息。

       

        MQ的生产者测试类:RocketMQProducerTest.java

       

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.common.message.Message;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQProducerTest {

    public static void main(String[] args) {

        String mqNameServer = "172.10.254.2:9876";
        String mqTopics = "MQ-MSG-TOPICS-TEST";

        String producerMqGroupName = "PRODUCER-MQ-GROUP";
        RocketMQProducer mqProducer = new RocketMQProducer(mqNameServer, producerMqGroupName, mqTopics);
        mqProducer.init();


        for (int i = 0; i < 5; i++) {

            Message message = new Message();
            message.setBody(("I send message to RocketMQ " + i).getBytes());
            mqProducer.send(message);
        }



    }

}

          run RocketMQProducerTest.java 之后,RocketMQProducerTest.java 对应的控制台输出为:

                

messageId=0A71290100002A9F00000003D0BB0832, status=SEND_OK
messageId=0A71290100002A9F00000003D0BB08BB, status=SEND_OK
messageId=0A71290100002A9F00000003D0BB0944, status=SEND_OK
messageId=0A71290100002A9F00000003D0BB09CD, status=SEND_OK
messageId=0A71290300002A9F000000005440AEED, status=SEND_OK

         结果分析:表明所有消息都已经正常发送,且被RocketMQ正常接收。

          此时查看RocketMQConsumerTest.java对应的控制台输出发生改变,输出内容变更如下:

         

RocketMQConsumer Started! group=CONSUMER-MQ-GROUP instance=1eb7d308-4414-4658-90b5-e2cae3b793eb
msg data from rocketMQ:I send message to RocketMQ 1
msg data from rocketMQ:I send message to RocketMQ 0
msg data from rocketMQ:I send message to RocketMQ 3
msg data from rocketMQ:I send message to RocketMQ 2
msg data from rocketMQ:I send message to RocketMQ 4

看,简单吧!

备注:小编自己使用了Apache版本的RocketMQ(即RocketMQ 4.*),发现只需要更改import的package的路径而已,不需要修改其他代码,请参考。

RocketMQ的重复问题解决方式:
a.MQ的消费端执行的操作具有幂等性,即无论多少次重复执行,其结果是一样的;
b.MQ的消费端做重复校验,比如将受到MQ消息的唯一编号保存到Redis中,即每次收到消息时,将检查唯一编号是否已经在Redis中,如果存在说明消息重复;否则将唯一编号放入到Redis中,可以根据系统需要设置唯一编号在Redis中的过期时间,以防止Redis溢出。

 

参考链接:1.分布式开放消息系统(RocketMQ)的原理与实践 (强烈推荐)

                    2.RocketMQ捐赠给Apache那些鲜为人知的故事

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/149571.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • dos下debug的使用「建议收藏」

    dos下debug的使用「建议收藏」起因:最近学习汇编需要对程序进行调试故作此笔记用法:一、用debug把程序运行起来:1.在dos下进入自己的程序所在目录xxx2.xxx>debugproc.exe把程序执行起来二、常用命令1.-u–查看当前的汇编代码(00FF:00000055)后面可以跟参数查看某段代码2.-t–单步执行

    2022年10月15日
    4
  • MySQL行转列函数[通俗易懂]

    MySQL行转列函数[通俗易懂]原文链接:http://www.360doc.com/content/18/0525/20/14808334_757019563.shtml概述好久没写SQL语句,今天看到问答中的一个问题,拿来研究一下。问题链接:关于Mysql的分级输出问题情景简介学校里面记录成绩,每个人的选课不一样,而且以后会添加课程,所以不需要把所有课程当作列。数据表里面数据如下图,使用姓名+课程作为联合主键(…

    2022年6月13日
    123
  • SpringMVC+easyUI CRUD 添加数据C

    SpringMVC+easyUI CRUD 添加数据C

    2022年1月30日
    36
  • 排序算法详细对比[通俗易懂]

    常见考点:1.不稳定排序有:选择排序、快速排序、希尔排序、堆排序

    2022年4月7日
    44
  • cultureinfo 类 java_为国家,语言组合创建自定义CultureInfo

    cultureinfo 类 java_为国家,语言组合创建自定义CultureInfo我正在开发一个.net4.5应用程序,需要多语言支持多文化等.以下是国家/语言的示例列表俄罗斯/俄罗斯比利时/法国比利时/荷兰对于上述所有内容,可以根据上述文化名称创建CultureInfo对象ru-RUfr-BEnl-BE当用户进入站点时,我将Thread.CurrentThread.CurrentCulture和Thread.CurrentThread.CurrentUICulture设置…

    2022年6月19日
    24
  • 抖音表白代码「建议收藏」

    最近抖音上火了一个由小伙伴自己制作的一个表白代码,很多小伙伴都在问这个表白代码是怎么写的?小编今天就为大家带来了抖音表白代码汇总!抖音表白代码示例1:msgbox(“做我女朋友好吗?”)msgbox(“房产证写你名字…保大…我妈会游泳…”)示例2:msgbox(“做我女朋友好吗?”)msgbox(“房产证写你名字…保大…我妈会游泳…”)x=msgbox(“做我女朋友…

    2022年4月13日
    436

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号