1 XA模式示例
2 架构

注:此图来自seata官网。
3 源码分析
3.1 TM开启全局事务
此过程和AT模式一样,使用@GlobalTransactional即可。
3.2 RM执行分支事务
因为DataSource使用了代理,所以所有DB操作均交个DataSourceProxyXA完成,当执行db操作时,请求将会由ExecuteTemplateXA执行。
3.2.1 执行sql:ExecuteTemplateXA#execute
在以下代码中我们可以看到,其主要逻辑如下:
- 记录下事务提交方式,自动提交 or 非自动提交
- 事务提交方式改为非自动提交,见:connectionProxyXA.setAutoCommit方法。这一步将向TC注册分支事务。
- 执行DB操作。
- 如果执行失败了,那么rollback,见:connectionProxyXA.rollback()
- 如果开始时事务为自动提交,那么commit,见:connectionProxyXA.commit()
public static
T execute(AbstractConnectionProxyXA connectionProxyXA, StatementCallback
statementCallback, S targetStatement, Object... args) throws SQLException { boolean autoCommitStatus = connectionProxyXA.getAutoCommit(); if (autoCommitStatus) { // XA Start connectionProxyXA.setAutoCommit(false); } try { T res = null; try { // execute SQL res = statementCallback.execute(targetStatement, args); } catch (Throwable ex) { if (autoCommitStatus) { // XA End & Rollback try { connectionProxyXA.rollback(); } catch (SQLException sqle) { // log and ignore the rollback failure. LOGGER.warn( "Failed to rollback xa branch of " + connectionProxyXA.xid + "(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(), sqle); } } if (ex instanceof SQLException) { throw ex; } else { throw new SQLException(ex); } } if (autoCommitStatus) { try { // XA End & Prepare connectionProxyXA.commit(); } catch (Throwable ex) { LOGGER.warn( "Failed to commit xa branch of " + connectionProxyXA.xid + ") since " + ex.getMessage(), ex); // XA End & Rollback if (!(ex instanceof SQLException) || !AbstractConnectionProxyXA.SQLSTATE_XA_NOT_END.equalsIgnoreCase(((SQLException) ex).getSQLState())) { try { connectionProxyXA.rollback(); } catch (SQLException sqle) { // log and ignore the rollback failure. LOGGER.warn( "Failed to rollback xa branch of " + connectionProxyXA.xid + "(caused by commit failure(" + ex.getMessage() + ") since " + sqle.getMessage(), sqle); } } if (ex instanceof SQLException) { throw ex; } else { throw new SQLException(ex); } } } return res; } finally { if (autoCommitStatus) { connectionProxyXA.setAutoCommit(true); } } }
3.2.2 设置自动提交方式:ConnectionProxyXA#setAutoCommit
- 向TC注册分支事务。
- 将当前分支事务id和xaResource绑定起来,即:xaResource.start
public void setAutoCommit(boolean autoCommit) throws SQLException { if (currentAutoCommitStatus == autoCommit) { return; } if (autoCommit) { // According to JDBC spec: // If this method is called during a transaction and the // auto-commit mode is changed, the transaction is committed. if (xaActive) { commit(); } } else { if (xaActive) { throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active"); } // Start a XA branch long branchId = 0L; try { // 1. register branch to TC then get the branchId branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null, null); } catch (TransactionException te) { cleanXABranchContext(); throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te); } // 2. build XA-Xid with xid and branchId this.xaBranchXid = XAXidBuilder.build(xid, branchId); try { // 3. XA Start xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); } catch (XAException e) { cleanXABranchContext(); throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e); } // 4. XA is active this.xaActive = true; } currentAutoCommitStatus = autoCommit; }
3.2.3 设置自动提交方式:ConnectionProxyXA#commit
- 终止xaBranchXid分支所执行的工作,即:xaResource.end
- 请求资源管理器准备好xaBranchXid事务的提交工作,即:xaResource.prepare
public void commit() throws SQLException { if (currentAutoCommitStatus) { // Ignore the committing on an autocommit session. return; } if (!xaActive || this.xaBranchXid == null) { throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); } try { // XA End: Success xaResource.end(xaBranchXid, XAResource.TMSUCCESS); // XA Prepare xaResource.prepare(xaBranchXid); // Keep the Connection if necessary keepIfNecessary(); } catch (XAException xe) { try { // Branch Report to TC: Failed DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null); } catch (TransactionException te) { LOGGER.warn("Failed to report XA branch commit-failure on " + xid + "-" + xaBranchXid.getBranchId() + " since " + te.getCode() + ":" + te.getMessage() + " and XAException:" + xe.getMessage()); } throw new SQLException( "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe .getMessage(), xe); } finally { cleanXABranchContext(); } }
3.2.4 设置自动提交方式:ConnectionProxyXA#rollback
- 向TC汇报事务执行失败。
- 终止xaBranchXid分支所执行的工作,即:xaResource.end
public void rollback() throws SQLException { if (currentAutoCommitStatus) { // Ignore the committing on an autocommit session. return; } if (!xaActive || this.xaBranchXid == null) { throw new SQLException("should NOT rollback on an inactive session"); } try { // Branch Report to TC DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null); } catch (TransactionException te) { // log and ignore the report failure LOGGER.warn("Failed to report XA branch rollback on " + xid + "-" + xaBranchXid.getBranchId() + " since " + te.getCode() + ":" + te.getMessage()); } try { // XA End: Fail xaResource.end(xaBranchXid, XAResource.TMFAIL); } catch (XAException xe) { throw new SQLException( "Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe .getMessage(), xe); } finally { cleanXABranchContext(); } }
3.3 TM向TC发送请求,通知进行Global Commit/Rollback
这个过程和AT模式完全相同。
3.4 TC向各个分支发送Branch Commit/Rollback请求
这个过程和AT模式完全相同。
3.5 RM进行Branch Commit/Rollback
AbstractRMHandler收到请求后,最终交给ResourceManagerXA处理,具体的Commit和Rollback交个xaCommit、xaRollback完成,他们直接触发xaResource的commit、rollback方法。
public void xaCommit(String xid, long branchId, String applicationData) throws XAException { XAXid xaXid = XAXidBuilder.build(xid, branchId); xaResource.commit(xaXid, false); releaseIfNecessary(); } public void xaRollback(String xid, long branchId, String applicationData) throws XAException { XAXid xaXid = XAXidBuilder.build(xid, branchId); xaResource.rollback(xaXid); releaseIfNecessary(); }
3.6 异常补偿
异常补偿流程和AT模式完全相同。
4 XAResource
从上面源码层面可以看到,Seata的XA模式是基于jdk的XAResource接口实现的。以下是jdk文档对此接口的描述。
4.1 XAResource简介
XAResource 接口是基于X/Open CAE规范(分布式事务处理:XA 规范)的工业标准XA接口的Java映射。
在分布式事务处理 (DTP) 环境中,XA接口定义资源管理器和事务管理器之间的协定。JDBC驱动程序或JMS提供者实现此接口,以支持全局事务与数据库或消息服务连接之间的关联。
可由应用程序在外部事务管理器控制事务的环境中使用的任何事务资源均可支持XAResource接口。数据库管理系统就属于此类资源。应用程序可以通过多个数据库连接访问数据。通过事务管理器将每个数据库连接作为事务资源添加到列表中。事务管理器为参与全局事务的每个连接获取XAResource。事务管理器使用start方法建立全局事务与资源之间的关联,而使用end方法取消事务与资源之间的关联。资源管理器负责将全局事务关联到在start与end方法调用之间对其数据执行的所有工作。
在事务提交时,事务管理器通知资源管理器根据二阶段提交协议准备、提交或回滚事务。
4.2 方法摘要
| 返回结果 | 方法签名 | 描述 |
|---|---|---|
| void | commit(Xid xid, boolean onePhase) | 提交 xid 指定的全局事务。 |
| void | end(Xid xid, int flags) | 终止代表事务分支所执行的工作。 |
| void | forget(Xid xid) | 告知资源管理器忽略以启发式完成的事务分支。 |
| int | getTransactionTimeout() | 获取为此 XAResource 实例设置的当前事务超时值。 |
| boolean | isSameRM(XAResource xares) | 调用此方法,以确定目标对象表示的资源管理器实例是否与参数xares表示的资源管理器实例相同。 |
| int | prepare(Xid xid) | 请求资源管理器准备好 xid 中指定的事务的事务提交工作。 |
| Xid[] | recover(int flag) | 从资源管理器获取准备的事务分支的列表。 |
| void | rollback(Xid xid) | 通知资源管理器回滚代表事务分支执行的工作。 |
| boolean | setTransactionTimeout(int seconds) | 为此 XAResource 实例设置当前事务超时值。 |
| void | start(Xid xid, int flags) | 代表 xid 中指定的事务分支开始工作。 |
4.3 字段摘要
| 类型 | 属性 | 描述 |
|---|---|---|
| static int | TMENDRSCAN | 终止恢复扫描。 |
| static int | TMFAIL | 取消关联调用者,并将事务分支标记为只回滚。 |
| static int | TMJOIN | 调用者正连接现有事务分支。 |
| static int | TMNOFLAGS | 使用 TMNOFLAGS 指示不选择任何标志值。 |
| static int | TMONEPHASE | 调用者正在使用一阶段优化。 |
| static int | TMRESUME | 调用者正在恢复与挂起的事务分支的关联。 |
| static int | TMSTARTRSCAN | 启动恢复扫描。 |
| static int | TMSUCCESS | 取消调用者与事务分支的关联。 |
| static int | TMSUSPEND | 调用者正挂起(不是终止)其与事务分支的关联。 |
| static int | XA_OK | 事务工作正常准备就绪。 |
| static int | XA_RDONLY | 事务分支是只读的,并且已提交。 |
5 参考文档
Seata XA 模式
Seata 原理
Seata-AT模式 原理
Seata-TCC模式 原理
Seata-Saga模式 原理
Seata-XA模式 原理
TCC-Transaction原理
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/227083.html原文链接:https://javaforall.net
