再次研究消息队列记的笔记——activemq

再次研究消息队列记的笔记——activemq

分布式事务–消息队列

1.思考

sso服务

用户服务

日志服务

购物服务(购物车合并)

短信服务

订单服务

库存服务

物流服务

如何让这么多的服务并行执行?【涉及到分布式事务:为了保证数据的一致性】

2.分布式事务

分布式事务:在分布式环境下,如何保证数据一致性

分布式事务会涉及到性能太低的一个通病。

方案:

  • xa协议下的两段式提交

  • xa两段式提交的进阶版:tcc

  • 基于消息,采用最终一致性策略的分布式事务

LNC 分布式框架.

分布式事务理论基础:CPA理论、BASE理论

3.XA协议

XA协议:数据库与事务管理器的一个标准。

在xa协议下,提交一个事务需要经过两个阶段

阶段一:预备提交

阶段二:提交

再次研究消息队列记的笔记------activemq

4.TCC

需要在业务层实现,try,confirm,和cancle的接口。

再次研究消息队列记的笔记------activemq

5.消息队列

在一个事务正在进行的同时,发出消息给其他的业务,如果消息发送失败,或者消息的执行失败,则回滚消息,重复执行,反复执行失败后,记录失败信息,后期补充性的处理;在消息系统中开启事务,消息的事务是指,保证消息被正常消费,否则回滚的一种机制

补偿机制:日志记录,定时器在某个时间再执行(重试执行)

重复执行,需要考虑幂等性处理逻辑。

6.疑问

  • 如何确保消息发送成功? 消息应答模式?

  • 消息发送失败如何处理?

  • 消息事务?

  • 消息幂等性如何处理?

  • 消息阻塞?死信队列。

消息队列

1.消息产品

RabbitMQ 、 Kafka、ActiveMQ

  • RabbitMQ的协议是AMQP(Advanced Message Queueing Protoco);AMQP通用行较强,非java环境经常使用,传输内容就是标准字符串。RabbitMQ用Erlang开发

  • ActiveMQ使用的是JMS(Java Messaging Service )协议,JMS是针对Java体系的传输协议,队列两端必须有JVM,所以如果开发环境都是java的话推荐使用ActiveMQ,可以用Java的一些对象进行传递比如Map、BLob、Stream等。ActiveMQ也支持AMQP协议。

  • Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好;Kafka在传输过程中可能会出现消息重复的情况,不保证发送顺序,没有消息事务功能;一般使用kafka处理大数据日志。

再次研究消息队列记的笔记------activemq

2.ActiveMQ

2.1 整合activemq

只需要加入整合依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>5.15.2</version>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

2.2 队列消息

Queue 队列模式

Topic 发布订阅模式

Consumer 使用监听器监听MQ上是否有消息。

2.2.1 生产者


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

import javax.jms.*;

public class Product {
   
    public static void main(String[] args) {
   
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.7:61616");
        try {
   
            Connection connection = connectionFactory.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED); //Session.SESSION.TRASACTED 开启消息事务
            Queue testqueue = session.createQueue("TEST1");

            MessageProducer producer = session.createProducer(testqueue);
            TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("今天天气真好!我想出去走一走");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息持久化
            producer.send(textMessage);
            session.commit();
            connection.close();
        }catch (Exception e){
   
            e.printStackTrace();
        }

    }
}

2.2.2 消费者

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {
   
    public static void main(String[] args) {
   
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.1.7:61616");
        try {
   
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination testqueue = session.createQueue("TEST1");

            MessageConsumer consumer = session.createConsumer(testqueue);
            consumer.setMessageListener(new MessageListener() {
   
                @Override
                public void onMessage(Message message) {
   
                    if(message instanceof TextMessage){
   
                        try {
   
                            String text = ((TextMessage) message).getText();
                            System.out.println(text);

                           // session.rollback();
                        } catch (JMSException e) {
   
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                             session.rollback();
                        }
                    }
                }
            });

        }catch (Exception e){
   
            e.printStackTrace();;
        }
    }
}

2.3 消息事务

producer提交时的事务 事务开启 只执行send并不会提交到队列中,只有当执行session.commit()时,消息才被真正的提交到队列中。
事务不开启 只要执行send,就进入到队列中。
consumer 接收时的事务 事务开启,签收必须写Session.SESSION_TRANSACTED 收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了session.rollback()那么消息会释放,重新回到队列中被别的消费端再次消费。
事务不开启,签收方式选择Session.AUTO_ACKNOWLEDGE 只要调用comsumer.receive方法 ,自动确认。
事务不开启,签收方式选择Session.CLIENT_ACKNOWLEDGE 需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。
事务不开启,签收方式选择Session.DUPS_OK_ACKNOWLEDGE 在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。

2.4 消息持久化

通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/100743.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • bash找不到命令_bash sed

    bash找不到命令_bash sedCentos7默认安装了openJDK,jps命令不能使用,如果jdk是重新安装指定的版本,默认不需要重新安装jps服务[root@maven-test~]#jpsbash:jps:commandnotfound…解决办法[root@maven-test~]#yuminstall-yjava-1.8.0-openjdk-devel再次执行[root@maven-test~]#jps20755Jps[root@maven-test~]#…

    2022年9月16日
    4
  • python字符串的使用方法_python字符串是什么

    python字符串的使用方法_python字符串是什么python字符串常用方法find(sub[,start[,end]])在索引start和end之间查找字符串sub​找到,则返回最左端的索引值,未找到,则返回-1​start和end都可

    2022年8月6日
    7
  • 阅读软件怎么添加书源_认识一波苹果安卓手机上,那些以一敌百的小说漫画软件…[通俗易懂]

    阅读软件怎么添加书源_认识一波苹果安卓手机上,那些以一敌百的小说漫画软件…[通俗易懂]哈喽大家好,我是无知便是罪,专注于收集和分享互联网上有价值的好东西。今天让我们继续分享一些可以自定义源的软件吧。看过我视频的老用户一听到这个词肯定两眼放光了。像我们之前推荐的看小说的阅读,看漫画的异次元和看影视作品的海阔,他们有一个共同的特点,就是允许用户自定义上传一些书源啦漫画源和影视源。这样你在搜索作品的时候呢,软件就会在这些源头里面进行抓取。如果遇到收费的内容呢,你还可以免费换源,…

    2022年6月20日
    40
  • 学习SQL Server这一篇就够了

    学习SQL Server这一篇就够了目录第一章数据库概述1.1、数据库的好处1.2、数据库的常见概念1.3、数据库的存储特点1.4、数据库的常见分类1.5、SQL语言的分类第二章SQLServer概述2.1、SQLServer的概述2.2、SQLServer的下载2.3、SQLServer的安装2.4、SQLServer的第一种连接2.5、SQLServer的第二种连接2.6、SQLServer的连接说明第三章SQLServer数据库管理3.1、创建数据库3.1.1、界面方式3.1.2、命令方式3.2、修改数据库3.2.

    2022年7月27日
    7
  • 用通俗易懂的大白话讲解Map/Reduce原理「建议收藏」

    用通俗易懂的大白话讲解Map/Reduce原理「建议收藏」下面是我自己的微信公众号(不定期更新JAVA、大数据、个人成长等干货)1、公众号上有经典的技术电子书可以免费领2、大家有问题可以在公众号问我,只要你问了我就会回复(相互交流)也可以扫描下面二维码,加我个人微信,和我直接沟通Hadoop简介Hadoop就是一个实现了Google云计算系统的开源系统,包括并行计算模型Map/Reduce,分布式文件系统HDFS,以及……

    2022年7月26日
    3
  • HttpCanary教程_jquery post json

    HttpCanary教程_jquery post jsonHttpResponse对象Django服务器接收到客户端发送过来的请求后,会将提交上来的这些数据封装成一个HttpRequest对象传给视图函数。那么视图函数在处理完相关的逻辑后,也需要返回一个响

    2022年8月7日
    5

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号