1.LCN是什么
LCN是国产开源的分布式事务处理框架。LCN即:lock(锁定事务单元)、confirm(确认事务模块状态)、notify(通知事务)。
2.首先介绍3.0与4.0之前的差异
2.1.地址

2.2.添加升级如下功能

(1)3.0虽然有事务补偿机制,但4.0在此基础上不仅添加事务补偿机制的策性,还添加了管理的后台可以看到补偿的数据;同时也添加了一个回调地址,可以在补偿之前可以最先知道这次补偿的数据,也可以为我们的框架使用者提供一个决策权。
(2)同4.0时添加的插件扩展机制,也就是说他更加开放了,他可以可以容纳更多的rpc框架,也可以更多的支持db框架,比如mongodb、redis,还有将来一些框架,如ES等等。
3.LCN4.0原理
3.1.架构介绍

有图可得,lcn是通过nginx作为负载均衡的转发,也就是作为Txmanager的负载均衡的一个转发服务器;然后再是我们的TxManager,也就是事务管理器,然后事务管理器依赖两个服务,一个是redis服务,一个是Eureka服务集群;Eureka集群是用于我们TxManager之间的相互服务发现。redis是用于存放我们事务组的信息以及补偿的信息。然后模块A与模块B他们都需要去配置上我们TxClient的包架构(代码的包架构);来支持我们的LCN框架,以及他们的数据库。
3.2.核心步骤(LCN核心的三步骤)
首先讲解下什么是事务组:事务组是指的我们在整个事务过程中把各个节点(微服务)单元的事务信息存储在一个固定单元里。但这个信息并不是代表是事务信息,而是只是作为一个模块的标示信息。
创建事务组
是指在事务发起方代码开始执行业务之前先调用TxManager创建事务组对象,然后拿到事务标示GroupId的过程。(这里的groupId表示的是一次事务的唯一标示,就是指我们一次事务中,会有一个groupId存在。其次这里讲述在开始执行业务之前,是因为框架是基于切面的思想,那么切面就切面到了这个方法的业务执行过程中,然后又一个around切面。再然后在开始之前,在业务没有调用业务之前会先调用TxManager去创建事务组,然后创建完事务组以后Txmanager会返回事务组信息,事务组信息中就包含GroupId这个参数,这也就是作为的创建事务组。创建完事务组之后就相当于已经产生一个事务组标示。作为一个串联过程,识别为同一次事务的一个过程)
添加事务组
添加事务组是指参与方在执行完业务方法以后,将该模块的事务信息添加通知给TxManager的操作。
关闭事务组
是指在发起方执行完业务代码以后(执行到这个地方就表明没有错误和异常,也就是执行结束了,否则并不会执行到这个地方),将发起方执行结果状态通知给TxManager的动作。当执行完关闭事务组的方法以后,TxManager将根据事务组信息来通知相应的参与模块提交或回滚事务。
3.3.事务协调机制

如图:假设服务已经执行到关闭事务组的过程,那么接下来作为一个模块执行通知给TxManager,然后告诉他本次事务已经完成。那么如图中Txmanager下一个动作就是通过事务组的id,然后获取到本次事务组的事务信息;然后查看一下对应有那几个模块参与,然后如果是有A/B/C三个模块;那么对应的对三个模块做通知、提交、回滚。
那么提交的时候是提交给谁呢?
是提交给了我们的TxClient模块。然后TxCliient模块下有一个连接池,就是框架自定义的一个连接池(如图DB连接池);这个连接池其实就是在没有通知事务之前一直占有着这次事务的连接资源,就是没有释放。但是他在切面里面执行了close方法。在执行close的时候。如果需要(TxManager)分布式事务框架的连接。他被叫做“假关闭”,也就是没有关闭,只是在执行了一次关闭方法。实际的资源是没有释放的。这个资源是掌握在LCN的连接池里的。
然后当TxManager通知提交或事务回滚的时候呢?
TxManager会通知我们的TxClient端。然后TxClient会去执行相应的提交或回滚。提交或回滚之后再去关闭连接,然后在返回给DB连接池。这就只事务的协调机制。说白了就是代理DataSource的机制;相当于是拦截了一下连接池,控制了连接池的事务提交。
LCN事务控制原理是由事务模块TxClient下的代理连接池与TxManager的协调配合完成的事务协调控制。
TxClient的代理连接池实现了javax.sql.DataSource接口,并重写了close方法,事务模块在提交关闭以后TxClient连接池将执行”假关闭”操作,等待TxManager协调完成事务以后在关闭连接。
3.4.对于代理连接池的优化
1.自动超时机制 任何通讯都有最大超时限制,参与模块在等待通知的状态下也有最大超时限制,当超过时间限制以后事务模块将先确认事务状态,然后再决定执行提交或者回滚操作,主要为了给最大资源占用时间加上限制。 2.智能识别创建不同的连接 对于只读操作、非事务操作LCN将不开启代理功能,返回本地连接对象,对于补偿事务的启动方将开启回滚连接对象,执行完业务以后马上回滚事务。 3.LCN连接重用机制 当模块在同一次事务下被重复执行时,连接资源会被重用,提高连接的使用率。
4.Spring Cloud 整合LCN
4.1.下载LCN工程;
在LCN的github下载:https://github.com/codingapi/tx-lcn/
4.2.配置tx-manager事务协调器
修改其属性文件: (修改下载事务协调服务器的端口、接入的服务注册中心、使用的redis库等的集群或单点配置)
txmanager-start# #服务端口 server.port=8899 #tx-manager不得修改 spring.application.name=tx-manager spring.mvc.static-path-pattern=/ spring.resources.static-locations=classpath:/static/ txmanager-end# #zookeeper地址 #spring.cloud.zookeeper.connect-string=127.0.0.1:2181 #spring.cloud.zookeeper.discovery.preferIpAddress = true #eureka 地址 eureka.client.service-url.defaultZone=http://eurekaserver1:8081/eureka/,http://eurekaserver2:8082/eureka/,http://eurekaserver3:8083/eureka/ eureka.instance.prefer-ip-address=true redis-start# #redis 配置文件,根据情况选择集群或者单机模式 redis 集群环境配置 redis cluster #spring.redis.cluster.nodes=127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003 #spring.redis.cluster.commandTimeout=5000 redis 单点环境配置 #redis #redis主机地址 spring.redis.host=192.168.6.211 #redis主机端口 spring.redis.port=6379 #redis链接密码 spring.redis.password= spring.redis.pool.maxActive=10 spring.redis.pool.maxWait=-1 spring.redis.pool.maxIdle=5 spring.redis.pool.minIdle=0 spring.redis.timeout=0 #redis-end LCN-start# #业务模块与TxManager之间通讯的最大等待时间(单位:秒) #通讯时间是指:发起方与响应方之间完成一次的通讯时间。 #该字段代表的是Tx-Client模块与TxManager模块之间的最大通讯时间,超过该时间未响应本次请求失败。 tm.transaction.netty.delaytime = 5 #业务模块与TxManager之间通讯的心跳时间(单位:秒) tm.transaction.netty.hearttime = 15 #存储到redis下的数据最大保存时间(单位:秒) #该字段仅代表的事务模块数据的最大保存时间,补偿数据会永久保存。 tm.redis.savemaxtime=30 #socket server Socket对外服务端口 #TxManager的LCN协议的端口 tm.socket.port=9999 #最大socket连接数 #TxManager最大允许的建立连接数量 tm.socket.maxconnection=100 #事务自动补偿 (true:开启,false:关闭) # 说明: # 开启自动补偿以后,必须要配置 tm.compensate.notifyUrl 地址,仅当tm.compensate.notifyUrl 在请求补偿确认时返回success或者SUCCESS时,才会执行自动补偿,否则不会自动补偿。 # 关闭自动补偿,当出现数据时也会 tm.compensate.notifyUrl 地址。 # 当tm.compensate.notifyUrl 无效时,不影响TxManager运行,仅会影响自动补偿。 tm.compensate.auto=false #事务补偿记录回调地址(rest api 地址,post json格式) #请求补偿是在开启自动补偿时才会请求的地址。请求分为两种:1.补偿决策,2.补偿结果通知,可通过通过action参数区分compensate为补偿请求、notify为补偿通知。 #*注意当请求补偿决策时,需要补偿服务返回"SUCCESS"字符串以后才可以执行自动补偿。 #请求补偿结果通知则只需要接受通知即可。 #请求补偿的样例数据格式: #{"groupId":"TtQxTwJP","action":"compensate","json":"{\"address\":\"133.133.5.100:8081\",\"className\":\"com.example.demo.service.impl.DemoServiceImpl\",\"currentTime\":13,\"data\":\"C5IBLWNvbS5leGFtcGxlLmRlbW8uc2VydmljZS5pbXBsLkRlbW9TZXJ2aWNlSW1wbAwSBHNhdmUbehBqYXZhLmxhbmcuT2JqZWN0GAAQARwjeg9qYXZhLmxhbmcuQ2xhc3MYABABJCo/cHVibGljIGludCBjb20uZXhhbXBsZS5kZW1vLnNlcnZpY2UuaW1wbC5EZW1vU2VydmljZUltcGwuc2F2ZSgp\",\"groupId\":\"TtQxTwJP\",\"methodStr\":\"public int com.example.demo.service.impl.DemoServiceImpl.save()\",\"model\":\"demo1\",\"state\":0,\"time\":36,\"txGroup\":{\"groupId\":\"TtQxTwJP\",\"hasOver\":1,\"isCompensate\":0,\"list\":[{\"address\":\"133.133.5.100:8899\",\"isCompensate\":0,\"isGroup\":0,\"kid\":\"wnlEJoSl\",\"methodStr\":\"public int com.example.demo.service.impl.DemoServiceImpl.save()\",\"model\":\"demo2\",\"modelIpAddress\":\"133.133.5.100:8082\",\"channelAddress\":\"/133.133.5.100:64153\",\"notify\":1,\"uniqueKey\":\"bc13881a5d2ab2ace89ae5d34d\"}],\"nowTime\":0,\"startTime\":79,\"state\":1},\"uniqueKey\":\"be6eea31e382f1f0878d07cef319e4d7\"}"} #请求补偿的返回数据样例数据格式: #SUCCESS #请求补偿结果通知的样例数据格式: #{"resState":true,"groupId":"TtQxTwJP","action":"notify"} tm.compensate.notifyUrl=http://ip:port/path #补偿失败,再次尝试间隔(秒),最大尝试次数3次,当超过3次即为补偿失败,失败的数据依旧还会存在TxManager下。 tm.compensate.tryTime=30 #各事务模块自动补偿的时间上限(毫秒) #指的是模块执行自动超时的最大时间,该最大时间若过段会导致事务机制异常,该时间必须要模块之间通讯的最大超过时间。 #例如,若模块A与模块B,请求超时的最大时间是5秒,则建议改时间至少大于5秒。 tm.compensate.maxWaitTime=5000 LCN-end# logging.level.com.codingapi=debug
4.3.启动事务协调者
4.4 事务参与方配置
假定:事务参与方已经是正常运行的服务提供者。样例中的数据库是mysql,连接池采用druid;
4.4.1 pom文件引入LCN db插件和springcloud支持:
<properties> <lcn.last.version>4.1.0
lcn.last.version>
properties>
<dependency> <groupId>com.codingapi
groupId> <artifactId>transaction-springcloud
artifactId> <version>${lcn.last.version}
version> <exclusions> <exclusion> <groupId>org.slf4j
groupId> <artifactId>*
artifactId>
exclusion>
exclusions>
dependency> <dependency> <groupId>com.codingapi
groupId> <artifactId>tx-plugins-db
artifactId> <version>${lcn.last.version}
version> <exclusions> <exclusion> <groupId>org.slf4j
groupId> <artifactId>*
artifactId>
exclusion>
exclusions>
dependency>
4.4.2 yml添加配置
tm: manager: url: http://127.0.0.1:8899/tx/manager/
4.4.3 添加TxManagerTxUrlService到spring中
package com.mark.springcloud.service.impl; import com.codingapi.tx.config.service.TxManagerTxUrlService; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; / * 添加从注册中心获取url;注意通过注解放入容器。 */ @Service public class TxManagerTxUrlServiceImpl implements TxManagerTxUrlService{
@Value("${tm.manager.url}") private String url; @Override public String getTxUrl() {
return url; } }
4.4.4 事务参与方服务:
package com.mark.springcloud.service.impl; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import com.codingapi.tx.annotation.ITxTransaction; import com.mark.springcloud.dao.DeptDao; import com.mark.springcloud.entities.Dept; import com.mark.springcloud.service.DeptService; / * 注意需要实现 ITxTransaction; */ @Service public class DeptServiceImpl implements DeptService, ITxTransaction {
@Autowired private DeptDao dao; //注意需要开启事务 @Override @Transactional public boolean add(Dept dept) {
boolean rtnValue = dao.addDept(dept); return rtnValue; } }
4.4.5 启动事务参与方
启动spring boot应用。
4.5 事务发起方配置
正常情况下,一个服务一般即可能是事务的发起方也是事务的参与方。(在测试事务发起方、参与方都是同样配置。所以直接略过,只描述发起方特有代码)
4.5.1 参照样例,实现TxManagerHttpRequestService
package com.mark.springcloud.controller; import com.codingapi.tx.netty.service.TxManagerHttpRequestService; import com.lorne.core.framework.utils.http.HttpUtils; import org.springframework.stereotype.Service; / * 常见TxManagerHttpRequestService重写get、post方法; */ @Service public class TxManagerHttpRequestServiceImpl implements TxManagerHttpRequestService{
@Override public String httpGet(String url) {
System.out.println("httpGet-start"); String res = HttpUtils.get(url); System.out.println("httpGet-end"); return res; } @Override public String httpPost(String url, String params) {
System.out.println("httpPost-start"); String res = HttpUtils.post(url,params); System.out.println("httpPost-end"); return res; } }
4.5.2 事务发起方服务处理
package com.mark.springcloud.controller; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.codingapi.tx.annotation.TxTransaction; import com.mark.springcloud.entities.Dept; import com.mark.springcloud.service.DeptClientService; @RestController public class DeptController_Consumer {
@Autowired private DeptClientService service; //@TxTransaction(isStart = true)注解修饰该方法为事务发起方,开启事务组。 @TxTransaction(isStart = true) @RequestMapping(value = "/consumer/dept/add") public Object add(Dept dept) {
Object rtnObj = this.service.add(dept); int x = (int)(Math.random()*10); //事务发起方随机数小于5时,抛出异常,则事务参与方事务会回滚。否则正常执行,事务参与方事务正常提交。 if (x < 5) {
int m = 1/0; } return rtnObj; } }
4.5.3 启动事务发起方
启动spring boot 应用。
4.6.测试事务
调用事务发起方服务,事务正常受事务协调者控制,当发起方和参与方都正常执行无异常时,事务正常提交,否则回滚。
4.7.总结
Spring Cloud 集成LCN进行分布式事务控制使用简单,整个原理也很清晰。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/223126.html原文链接:https://javaforall.net
