再次研究消息队列记的笔记——activemq

再次研究消息队列记的笔记——activemq

分布式事务–消息队列

1.思考

sso服务

用户服务

日志服务

购物服务(购物车合并)

短信服务

订单服务

库存服务

物流服务

如何让这么多的服务并行执行?【涉及到分布式事务:为了保证数据的一致性】

2.分布式事务

分布式事务:在分布式环境下,如何保证数据一致性

分布式事务会涉及到性能太低的一个通病。

方案:

  • xa协议下的两段式提交

  • xa两段式提交的进阶版:tcc

  • 基于消息,采用最终一致性策略的分布式事务

LNC 分布式框架.

分布式事务理论基础:CPA理论、BASE理论

3.XA协议

XA协议:数据库与事务管理器的一个标准。

在xa协议下,提交一个事务需要经过两个阶段

阶段一:预备提交

阶段二:提交

再次研究消息队列记的笔记------activemq

4.TCC

需要在业务层实现,try,confirm,和cancle的接口。

再次研究消息队列记的笔记------activemq

5.消息队列

在一个事务正在进行的同时,发出消息给其他的业务,如果消息发送失败,或者消息的执行失败,则回滚消息,重复执行,反复执行失败后,记录失败信息,后期补充性的处理;在消息系统中开启事务,消息的事务是指,保证消息被正常消费,否则回滚的一种机制

补偿机制:日志记录,定时器在某个时间再执行(重试执行)

重复执行,需要考虑幂等性处理逻辑。

6.疑问

  • 如何确保消息发送成功? 消息应答模式?

  • 消息发送失败如何处理?

  • 消息事务?

  • 消息幂等性如何处理?

  • 消息阻塞?死信队列。

消息队列

1.消息产品

RabbitMQ 、 Kafka、ActiveMQ

  • RabbitMQ的协议是AMQP(Advanced Message Queueing Protoco);AMQP通用行较强,非java环境经常使用,传输内容就是标准字符串。RabbitMQ用Erlang开发

  • ActiveMQ使用的是JMS(Java Messaging Service )协议,JMS是针对Java体系的传输协议,队列两端必须有JVM,所以如果开发环境都是java的话推荐使用ActiveMQ,可以用Java的一些对象进行传递比如Map、BLob、Stream等。ActiveMQ也支持AMQP协议。

  • Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好;Kafka在传输过程中可能会出现消息重复的情况,不保证发送顺序,没有消息事务功能;一般使用kafka处理大数据日志。

再次研究消息队列记的笔记------activemq

2.ActiveMQ

2.1 整合activemq

只需要加入整合依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>5.15.2</version>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

2.2 队列消息

Queue 队列模式

Topic 发布订阅模式

Consumer 使用监听器监听MQ上是否有消息。

2.2.1 生产者


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

import javax.jms.*;

public class Product {
   
    public static void main(String[] args) {
   
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.7:61616");
        try {
   
            Connection connection = connectionFactory.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED); //Session.SESSION.TRASACTED 开启消息事务
            Queue testqueue = session.createQueue("TEST1");

            MessageProducer producer = session.createProducer(testqueue);
            TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("今天天气真好!我想出去走一走");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息持久化
            producer.send(textMessage);
            session.commit();
            connection.close();
        }catch (Exception e){
   
            e.printStackTrace();
        }

    }
}

2.2.2 消费者

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {
   
    public static void main(String[] args) {
   
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.1.7:61616");
        try {
   
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination testqueue = session.createQueue("TEST1");

            MessageConsumer consumer = session.createConsumer(testqueue);
            consumer.setMessageListener(new MessageListener() {
   
                @Override
                public void onMessage(Message message) {
   
                    if(message instanceof TextMessage){
   
                        try {
   
                            String text = ((TextMessage) message).getText();
                            System.out.println(text);

                           // session.rollback();
                        } catch (JMSException e) {
   
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                             session.rollback();
                        }
                    }
                }
            });

        }catch (Exception e){
   
            e.printStackTrace();;
        }
    }
}

2.3 消息事务

producer提交时的事务 事务开启 只执行send并不会提交到队列中,只有当执行session.commit()时,消息才被真正的提交到队列中。
事务不开启 只要执行send,就进入到队列中。
consumer 接收时的事务 事务开启,签收必须写Session.SESSION_TRANSACTED 收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了session.rollback()那么消息会释放,重新回到队列中被别的消费端再次消费。
事务不开启,签收方式选择Session.AUTO_ACKNOWLEDGE 只要调用comsumer.receive方法 ,自动确认。
事务不开启,签收方式选择Session.CLIENT_ACKNOWLEDGE 需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。
事务不开启,签收方式选择Session.DUPS_OK_ACKNOWLEDGE 在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。

2.4 消息持久化

通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/100743.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • HTML去除横向滚动条

    HTML去除横向滚动条style layui layer content overflow x hidden important style 通常在 DIV 样式中加入后刷新页面即可 divstyle overflow x hidden important divstyle overflow x hidden important

    2025年11月29日
    4
  • onedrive免费扩容25t_onedrive怎么免费扩容1T

    onedrive免费扩容25t_onedrive怎么免费扩容1TOneDrive存储我们都知道没有开office365,自己onedrive的储存空间只有5GB,onenote做笔记以及用onedrive同步文档空间不够,但是又不想开office365;所以在网上看到别人说onedrive可以推荐别人注册,可以扩容10GB;加起来一共15GB,用来做笔记完全够用;或许有大佬会说可以弄到Office教育版的微软账号,有5T或1T的存储空间,但是这个会涉及到账号里面文档的安全性,这种账号是属于教育机构的,全局管理员可以有权查看里面储存的文件并且有权删去账号,这样的

    2025年10月17日
    2
  • 用EasyBoot轻松做启动光盘

    用EasyBoot轻松做启动光盘原文转自:电脑技术资料园BrianLiu学习之园原版系统安装盘的缺憾——不管是Windows98还是WindowsNT/2000/XP,仅能实现单一系统的初始安装,缺少调试维护、系统恢复、DOS杀毒等工具。虽然市面上出现了N合1光盘,但一般体积庞大,且无法满足自己的需要。  用EasyBoot刻盘正好可以解决这个问题。EasyBoot是一款集成…

    2022年7月26日
    4
  • 数据库课程设计—超市零售信息管理系统(Python实现)

    数据库课程设计—超市零售信息管理系统(Python实现)数据库课程设计超市零售信息管理系统 Python 实现 SQLServer 文章目录数据库课程设计前言 一 设计目的 二 设计内容一 需求分析 一 设计概念 二 功能说明 1 进货管理 2 销售管理 3 库存管理 4 人员管理 三 功能模块图二 概念结构设计三 逻辑结构设计四 代码实现 一 实现 Python 连接 SQLSevere 数据库 二 创建数据库表 三 插入数据 四 创建界面按钮 并实现数据库的 增删改查 五 总结前言 一 设计目的学生根据所学的数据库系统原理与程序设计的知识 能够针对一个

    2025年10月20日
    2
  • 齿轮常见故障与特征频率及其谐波、边频带

    齿轮常见故障与特征频率及其谐波、边频带

    2021年11月22日
    50
  • 缓存穿透,缓存击穿,缓存雪崩解决方案分析

    缓存穿透,缓存击穿,缓存雪崩解决方案分析前言设计一个缓存系统,不得不要考虑的问题就是:缓存穿透、缓存击穿与失效时的雪崩效应。缓存穿透缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。解决方案

    2022年6月30日
    21

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号