再次研究消息队列记的笔记——activemq

再次研究消息队列记的笔记——activemq

分布式事务–消息队列

1.思考

sso服务

用户服务

日志服务

购物服务(购物车合并)

短信服务

订单服务

库存服务

物流服务

如何让这么多的服务并行执行?【涉及到分布式事务:为了保证数据的一致性】

2.分布式事务

分布式事务:在分布式环境下,如何保证数据一致性

分布式事务会涉及到性能太低的一个通病。

方案:

  • xa协议下的两段式提交

  • xa两段式提交的进阶版:tcc

  • 基于消息,采用最终一致性策略的分布式事务

LNC 分布式框架.

分布式事务理论基础:CPA理论、BASE理论

3.XA协议

XA协议:数据库与事务管理器的一个标准。

在xa协议下,提交一个事务需要经过两个阶段

阶段一:预备提交

阶段二:提交

再次研究消息队列记的笔记------activemq

4.TCC

需要在业务层实现,try,confirm,和cancle的接口。

再次研究消息队列记的笔记------activemq

5.消息队列

在一个事务正在进行的同时,发出消息给其他的业务,如果消息发送失败,或者消息的执行失败,则回滚消息,重复执行,反复执行失败后,记录失败信息,后期补充性的处理;在消息系统中开启事务,消息的事务是指,保证消息被正常消费,否则回滚的一种机制

补偿机制:日志记录,定时器在某个时间再执行(重试执行)

重复执行,需要考虑幂等性处理逻辑。

6.疑问

  • 如何确保消息发送成功? 消息应答模式?

  • 消息发送失败如何处理?

  • 消息事务?

  • 消息幂等性如何处理?

  • 消息阻塞?死信队列。

消息队列

1.消息产品

RabbitMQ 、 Kafka、ActiveMQ

  • RabbitMQ的协议是AMQP(Advanced Message Queueing Protoco);AMQP通用行较强,非java环境经常使用,传输内容就是标准字符串。RabbitMQ用Erlang开发

  • ActiveMQ使用的是JMS(Java Messaging Service )协议,JMS是针对Java体系的传输协议,队列两端必须有JVM,所以如果开发环境都是java的话推荐使用ActiveMQ,可以用Java的一些对象进行传递比如Map、BLob、Stream等。ActiveMQ也支持AMQP协议。

  • Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好;Kafka在传输过程中可能会出现消息重复的情况,不保证发送顺序,没有消息事务功能;一般使用kafka处理大数据日志。

再次研究消息队列记的笔记------activemq

2.ActiveMQ

2.1 整合activemq

只需要加入整合依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>5.15.2</version>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

2.2 队列消息

Queue 队列模式

Topic 发布订阅模式

Consumer 使用监听器监听MQ上是否有消息。

2.2.1 生产者


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

import javax.jms.*;

public class Product {
   
    public static void main(String[] args) {
   
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.7:61616");
        try {
   
            Connection connection = connectionFactory.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED); //Session.SESSION.TRASACTED 开启消息事务
            Queue testqueue = session.createQueue("TEST1");

            MessageProducer producer = session.createProducer(testqueue);
            TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("今天天气真好!我想出去走一走");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息持久化
            producer.send(textMessage);
            session.commit();
            connection.close();
        }catch (Exception e){
   
            e.printStackTrace();
        }

    }
}

2.2.2 消费者

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {
   
    public static void main(String[] args) {
   
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.1.7:61616");
        try {
   
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination testqueue = session.createQueue("TEST1");

            MessageConsumer consumer = session.createConsumer(testqueue);
            consumer.setMessageListener(new MessageListener() {
   
                @Override
                public void onMessage(Message message) {
   
                    if(message instanceof TextMessage){
   
                        try {
   
                            String text = ((TextMessage) message).getText();
                            System.out.println(text);

                           // session.rollback();
                        } catch (JMSException e) {
   
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                             session.rollback();
                        }
                    }
                }
            });

        }catch (Exception e){
   
            e.printStackTrace();;
        }
    }
}

2.3 消息事务

producer提交时的事务 事务开启 只执行send并不会提交到队列中,只有当执行session.commit()时,消息才被真正的提交到队列中。
事务不开启 只要执行send,就进入到队列中。
consumer 接收时的事务 事务开启,签收必须写Session.SESSION_TRANSACTED 收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了session.rollback()那么消息会释放,重新回到队列中被别的消费端再次消费。
事务不开启,签收方式选择Session.AUTO_ACKNOWLEDGE 只要调用comsumer.receive方法 ,自动确认。
事务不开启,签收方式选择Session.CLIENT_ACKNOWLEDGE 需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。
事务不开启,签收方式选择Session.DUPS_OK_ACKNOWLEDGE 在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。

2.4 消息持久化

通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/100743.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 群晖Virtual Machine Manager虚拟机安装OpenWrt软路由作为旁路由的详细步骤

    群晖Virtual Machine Manager虚拟机安装OpenWrt软路由作为旁路由的详细步骤0.前言:本来一直都是在Windows10的Hyper-V中虚拟软路由的,直到有一天突发奇想,手贱在windows10宿主机中安装了个安卓模拟器,由于众所周知的原因,安卓模拟器是不能同时与Hyper-V虚拟机共存的,虽然我在安装后运行安卓模拟器的时候没有去点击那个关闭Hyper-V的提示按钮,并且迅速点击了退出按钮,但是悲剧还是发生了,我的Windows10宿主机在重启后自动关闭了Hyper-V功能,导致我在其中安装的openwrt旁路由、centos测试环境都熄火了!然后就是赶紧在【程序】中添加【Hy

    2022年6月1日
    139
  • Android – match_parent 和 fill_parent差异

    Android – match_parent 和 fill_parent差异

    2022年1月2日
    52
  • Ubuntu如何安装vscode_ubuntu20.0.4 vscode配置c++环境

    Ubuntu如何安装vscode_ubuntu20.0.4 vscode配置c++环境ubuntu安装vscode教程通过官方PPA安装Ubuntumake1、执行sudoadd-apt-repositoryppa:ubuntu-desktop/ubuntu-make命令2、执行sudoapt-getupdate命令3、执行sudoapt-get…

    2022年9月17日
    2
  • 为了写论文给 Linux “投毒”, Linux 内核维护者封杀明尼苏达大学「建议收藏」

    为了写论文给 Linux “投毒”, Linux 内核维护者封杀明尼苏达大学「建议收藏」Linux内核稳定分支的维护者GregKroah-Hartman决定禁止美国明尼苏达大学向主线Linux内核提交补丁,因为他们故意提交有安全影响的可疑代码,他们以“如果向开源计划提交代码的方式注入漏洞,开源社区将如何处理”进行了专门研究,并且还发布了一篇论文:《OntheFeasibilityofStealthilyIntroducingVulnerabilitiesinOpen-SourceSoftwareviaHypocriteCommits》论文地址:https://

    2022年7月23日
    19
  • php取得json_decode中的值,php json decode-获取值[通俗易懂]

    php取得json_decode中的值,php json decode-获取值[通俗易懂]正如Danp已经说过的,返回的JSON包含在函数调用中(由jsoncallback=json)你不能完全摆脱这个,但是,只是用AreaSearch?jsoncallback=&lat=41.1131514&lng=-74.0437521至少删除json在字符串的开头,您可以通过以下方式除去括号:$json=trim(trim($json),”();”);给予:{items:[…

    2022年7月17日
    29
  • windows7系统 您的账户已被停用。请向系统管理员咨询

    windows7系统 您的账户已被停用。请向系统管理员咨询问题细节描述:前几天好像是想换个用户桌面,换个用户桌面,首先把Administrator用户给禁用,然后把现在使用的用户名给删除。重启电脑,结果进不去了,显示这个错误提示:您的账户已被停用。请向系统管理员咨询解决办法:1.首先重启–(正常启动)2.按F8–(这个大家都知道-开机选项)3.选择安全模式–(注意:不是带命令的安全模式,是安全模式。F8第

    2025年6月23日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号