Seata-XA模式 原理

Seata-XA模式 原理1XA 模式示例示例参考 github 上 seata sample 业务代码层面和 xa 完全相同 仅数据库代理层面替换成 DataSourcePr 即可 更多内容 参考示例 2 架构注 此图来自 seata 官网 3 源码分析 3 1TM 开启全局事务此过程和 AT 模式一样 使用 GlobalTransa 即可 3 2RM 执行分支事务因为 DataSource 使用了代理 所以所有 DB 操作均交个 DataSourcePr 完成 当执行 db 操作时 请求将会由 ExecuteTempl

1 XA模式示例

2 架构

在这里插入图片描述
注:此图来自seata官网。

3 源码分析

3.1 TM开启全局事务

此过程和AT模式一样,使用@GlobalTransactional即可。

3.2 RM执行分支事务

因为DataSource使用了代理,所以所有DB操作均交个DataSourceProxyXA完成,当执行db操作时,请求将会由ExecuteTemplateXA执行。

3.2.1 执行sql:ExecuteTemplateXA#execute

在以下代码中我们可以看到,其主要逻辑如下:

  • 记录下事务提交方式,自动提交 or 非自动提交
  • 事务提交方式改为非自动提交,见:connectionProxyXA.setAutoCommit方法。这一步将向TC注册分支事务。
  • 执行DB操作。
  • 如果执行失败了,那么rollback,见:connectionProxyXA.rollback()
  • 如果开始时事务为自动提交,那么commit,见:connectionProxyXA.commit()
public static 
  
    T execute(AbstractConnectionProxyXA connectionProxyXA, StatementCallback 
   
     statementCallback, S targetStatement, Object... args) throws SQLException { boolean autoCommitStatus = connectionProxyXA.getAutoCommit(); if (autoCommitStatus) { // XA Start connectionProxyXA.setAutoCommit(false); } try { T res = null; try { // execute SQL res = statementCallback.execute(targetStatement, args); } catch (Throwable ex) { if (autoCommitStatus) { // XA End & Rollback try { connectionProxyXA.rollback(); } catch (SQLException sqle) { // log and ignore the rollback failure. LOGGER.warn( "Failed to rollback xa branch of " + connectionProxyXA.xid + "(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(), sqle); } } if (ex instanceof SQLException) { throw ex; } else { throw new SQLException(ex); } } if (autoCommitStatus) { try { // XA End & Prepare connectionProxyXA.commit(); } catch (Throwable ex) { LOGGER.warn( "Failed to commit xa branch of " + connectionProxyXA.xid + ") since " + ex.getMessage(), ex); // XA End & Rollback if (!(ex instanceof SQLException) || !AbstractConnectionProxyXA.SQLSTATE_XA_NOT_END.equalsIgnoreCase(((SQLException) ex).getSQLState())) { try { connectionProxyXA.rollback(); } catch (SQLException sqle) { // log and ignore the rollback failure. LOGGER.warn( "Failed to rollback xa branch of " + connectionProxyXA.xid + "(caused by commit failure(" + ex.getMessage() + ") since " + sqle.getMessage(), sqle); } } if (ex instanceof SQLException) { throw ex; } else { throw new SQLException(ex); } } } return res; } finally { if (autoCommitStatus) { connectionProxyXA.setAutoCommit(true); } } } 
    
  

3.2.2 设置自动提交方式:ConnectionProxyXA#setAutoCommit

  • 向TC注册分支事务。
  • 将当前分支事务id和xaResource绑定起来,即:xaResource.start
public void setAutoCommit(boolean autoCommit) throws SQLException { if (currentAutoCommitStatus == autoCommit) { return; } if (autoCommit) { // According to JDBC spec: // If this method is called during a transaction and the // auto-commit mode is changed, the transaction is committed. if (xaActive) { commit(); } } else { if (xaActive) { throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active"); } // Start a XA branch long branchId = 0L; try { // 1. register branch to TC then get the branchId branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null, null); } catch (TransactionException te) { cleanXABranchContext(); throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te); } // 2. build XA-Xid with xid and branchId this.xaBranchXid = XAXidBuilder.build(xid, branchId); try { // 3. XA Start xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); } catch (XAException e) { cleanXABranchContext(); throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e); } // 4. XA is active this.xaActive = true; } currentAutoCommitStatus = autoCommit; } 

3.2.3 设置自动提交方式:ConnectionProxyXA#commit

  • 终止xaBranchXid分支所执行的工作,即:xaResource.end
  • 请求资源管理器准备好xaBranchXid事务的提交工作,即:xaResource.prepare
public void commit() throws SQLException { if (currentAutoCommitStatus) { // Ignore the committing on an autocommit session. return; } if (!xaActive || this.xaBranchXid == null) { throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); } try { // XA End: Success xaResource.end(xaBranchXid, XAResource.TMSUCCESS); // XA Prepare xaResource.prepare(xaBranchXid); // Keep the Connection if necessary keepIfNecessary(); } catch (XAException xe) { try { // Branch Report to TC: Failed DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null); } catch (TransactionException te) { LOGGER.warn("Failed to report XA branch commit-failure on " + xid + "-" + xaBranchXid.getBranchId() + " since " + te.getCode() + ":" + te.getMessage() + " and XAException:" + xe.getMessage()); } throw new SQLException( "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe .getMessage(), xe); } finally { cleanXABranchContext(); } } 

3.2.4 设置自动提交方式:ConnectionProxyXA#rollback

  • 向TC汇报事务执行失败。
  • 终止xaBranchXid分支所执行的工作,即:xaResource.end
public void rollback() throws SQLException { if (currentAutoCommitStatus) { // Ignore the committing on an autocommit session. return; } if (!xaActive || this.xaBranchXid == null) { throw new SQLException("should NOT rollback on an inactive session"); } try { // Branch Report to TC DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null); } catch (TransactionException te) { // log and ignore the report failure LOGGER.warn("Failed to report XA branch rollback on " + xid + "-" + xaBranchXid.getBranchId() + " since " + te.getCode() + ":" + te.getMessage()); } try { // XA End: Fail xaResource.end(xaBranchXid, XAResource.TMFAIL); } catch (XAException xe) { throw new SQLException( "Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe .getMessage(), xe); } finally { cleanXABranchContext(); } } 

3.3 TM向TC发送请求,通知进行Global Commit/Rollback

这个过程和AT模式完全相同。

3.4 TC向各个分支发送Branch Commit/Rollback请求

这个过程和AT模式完全相同。

3.5 RM进行Branch Commit/Rollback

AbstractRMHandler收到请求后,最终交给ResourceManagerXA处理,具体的Commit和Rollback交个xaCommit、xaRollback完成,他们直接触发xaResource的commit、rollback方法。

public void xaCommit(String xid, long branchId, String applicationData) throws XAException { XAXid xaXid = XAXidBuilder.build(xid, branchId); xaResource.commit(xaXid, false); releaseIfNecessary(); } public void xaRollback(String xid, long branchId, String applicationData) throws XAException { XAXid xaXid = XAXidBuilder.build(xid, branchId); xaResource.rollback(xaXid); releaseIfNecessary(); } 

3.6 异常补偿

异常补偿流程和AT模式完全相同。

4 XAResource

从上面源码层面可以看到,Seata的XA模式是基于jdk的XAResource接口实现的。以下是jdk文档对此接口的描述。

4.1 XAResource简介

XAResource 接口是基于X/Open CAE规范(分布式事务处理:XA 规范)的工业标准XA接口的Java映射。

在分布式事务处理 (DTP) 环境中,XA接口定义资源管理器和事务管理器之间的协定。JDBC驱动程序或JMS提供者实现此接口,以支持全局事务与数据库或消息服务连接之间的关联。

可由应用程序在外部事务管理器控制事务的环境中使用的任何事务资源均可支持XAResource接口。数据库管理系统就属于此类资源。应用程序可以通过多个数据库连接访问数据。通过事务管理器将每个数据库连接作为事务资源添加到列表中。事务管理器为参与全局事务的每个连接获取XAResource。事务管理器使用start方法建立全局事务与资源之间的关联,而使用end方法取消事务与资源之间的关联。资源管理器负责将全局事务关联到在start与end方法调用之间对其数据执行的所有工作。

在事务提交时,事务管理器通知资源管理器根据二阶段提交协议准备、提交或回滚事务。

4.2 方法摘要

返回结果 方法签名 描述
void commit(Xid xid, boolean onePhase) 提交 xid 指定的全局事务。
void end(Xid xid, int flags) 终止代表事务分支所执行的工作。
void forget(Xid xid) 告知资源管理器忽略以启发式完成的事务分支。
int getTransactionTimeout() 获取为此 XAResource 实例设置的当前事务超时值。
boolean isSameRM(XAResource xares) 调用此方法,以确定目标对象表示的资源管理器实例是否与参数xares表示的资源管理器实例相同。
int prepare(Xid xid) 请求资源管理器准备好 xid 中指定的事务的事务提交工作。
Xid[] recover(int flag) 从资源管理器获取准备的事务分支的列表。
void rollback(Xid xid) 通知资源管理器回滚代表事务分支执行的工作。
boolean setTransactionTimeout(int seconds) 为此 XAResource 实例设置当前事务超时值。
void start(Xid xid, int flags) 代表 xid 中指定的事务分支开始工作。

4.3 字段摘要

类型 属性 描述
static int TMENDRSCAN 终止恢复扫描。
static int TMFAIL 取消关联调用者,并将事务分支标记为只回滚。
static int TMJOIN 调用者正连接现有事务分支。
static int TMNOFLAGS 使用 TMNOFLAGS 指示不选择任何标志值。
static int TMONEPHASE 调用者正在使用一阶段优化。
static int TMRESUME 调用者正在恢复与挂起的事务分支的关联。
static int TMSTARTRSCAN 启动恢复扫描。
static int TMSUCCESS 取消调用者与事务分支的关联。
static int TMSUSPEND 调用者正挂起(不是终止)其与事务分支的关联。
static int XA_OK 事务工作正常准备就绪。
static int XA_RDONLY 事务分支是只读的,并且已提交。

5 参考文档

Seata XA 模式

Seata 原理

Seata-AT模式 原理

Seata-TCC模式 原理

Seata-Saga模式 原理

Seata-XA模式 原理

TCC-Transaction原理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/227083.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午9:59
下一篇 2026年3月16日 下午9:59


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号