1 XA模式示例

示例参考:github上seata-sample
业务代码层面和xa完全相同,仅数据库代理层面替换成DataSourceProxyXA即可,更多内容,参考示例。

2 架构


注:此图来自seata官网。

3 源码分析

3.1 TM开启全局事务

此过程和AT模式一样,使用@GlobalTransactional即可。

3.2 RM执行分支事务

因为DataSource使用了代理,所以所有DB操作均交个DataSourceProxyXA完成,当执行db操作时,请求将会由ExecuteTemplateXA执行。

3.2.1 执行sql:ExecuteTemplateXA#execute

在以下代码中我们可以看到,其主要逻辑如下:

  • 记录下事务提交方式,自动提交 or 非自动提交
  • 事务提交方式改为非自动提交,见:connectionProxyXA.setAutoCommit方法。这一步将向TC注册分支事务。
  • 执行DB操作。
  • 如果执行失败了,那么rollback,见:connectionProxyXA.rollback()
  • 如果开始时事务为自动提交,那么commit,见:connectionProxyXA.commit()
public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA, StatementCallback<T, S> statementCallback, S targetStatement, Object... args) throws SQLException {boolean autoCommitStatus = connectionProxyXA.getAutoCommit();if (autoCommitStatus) {// XA StartconnectionProxyXA.setAutoCommit(false);}try {T res = null;try {// execute SQLres = statementCallback.execute(targetStatement, args);} catch (Throwable ex) {if (autoCommitStatus) {// XA End & Rollbacktry {connectionProxyXA.rollback();} catch (SQLException sqle) {// log and ignore the rollback failure.LOGGER.warn("Failed to rollback xa branch of " + connectionProxyXA.xid +"(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(),sqle);}}if (ex instanceof SQLException) {throw ex;} else {throw new SQLException(ex);}}if (autoCommitStatus) {try {// XA End & PrepareconnectionProxyXA.commit();} catch (Throwable ex) {LOGGER.warn("Failed to commit xa branch of " + connectionProxyXA.xid + ") since " + ex.getMessage(),ex);// XA End & Rollbackif (!(ex instanceof SQLException) || !AbstractConnectionProxyXA.SQLSTATE_XA_NOT_END.equalsIgnoreCase(((SQLException) ex).getSQLState())) {try {connectionProxyXA.rollback();} catch (SQLException sqle) {// log and ignore the rollback failure.LOGGER.warn("Failed to rollback xa branch of " + connectionProxyXA.xid +"(caused by commit failure(" + ex.getMessage() + ") since " + sqle.getMessage(),sqle);}}if (ex instanceof SQLException) {throw ex;} else {throw new SQLException(ex);}}}return res;} finally {if (autoCommitStatus) {connectionProxyXA.setAutoCommit(true);}}
}

3.2.2 设置自动提交方式:ConnectionProxyXA#setAutoCommit

  • 向TC注册分支事务。
  • 将当前分支事务id和xaResource绑定起来,即:xaResource.start
public void setAutoCommit(boolean autoCommit) throws SQLException {if (currentAutoCommitStatus == autoCommit) {return;}if (autoCommit) {// According to JDBC spec:// If this method is called during a transaction and the// auto-commit mode is changed, the transaction is committed.if (xaActive) {commit();}} else {if (xaActive) {throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");}// Start a XA branchlong branchId = 0L;try {// 1. register branch to TC then get the branchIdbranchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,null);} catch (TransactionException te) {cleanXABranchContext();throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);}// 2. build XA-Xid with xid and branchIdthis.xaBranchXid = XAXidBuilder.build(xid, branchId);try {// 3. XA StartxaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);} catch (XAException e) {cleanXABranchContext();throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);}// 4. XA is activethis.xaActive = true;}currentAutoCommitStatus = autoCommit;
}

3.2.3 设置自动提交方式:ConnectionProxyXA#commit

  • 终止xaBranchXid分支所执行的工作,即:xaResource.end
  • 请求资源管理器准备好xaBranchXid事务的提交工作,即:xaResource.prepare
public void commit() throws SQLException {if (currentAutoCommitStatus) {// Ignore the committing on an autocommit session.return;}if (!xaActive || this.xaBranchXid == null) {throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);}try {// XA End: SuccessxaResource.end(xaBranchXid, XAResource.TMSUCCESS);// XA PreparexaResource.prepare(xaBranchXid);// Keep the Connection if necessarykeepIfNecessary();} catch (XAException xe) {try {// Branch Report to TC: FailedDefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),BranchStatus.PhaseOne_Failed, null);} catch (TransactionException te) {LOGGER.warn("Failed to report XA branch commit-failure on " + xid + "-" + xaBranchXid.getBranchId()+ " since " + te.getCode() + ":" + te.getMessage() + " and XAException:" + xe.getMessage());}throw new SQLException("Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe.getMessage(), xe);} finally {cleanXABranchContext();}
}

3.2.4 设置自动提交方式:ConnectionProxyXA#rollback

  • 向TC汇报事务执行失败。
  • 终止xaBranchXid分支所执行的工作,即:xaResource.end
public void rollback() throws SQLException {if (currentAutoCommitStatus) {// Ignore the committing on an autocommit session.return;}if (!xaActive || this.xaBranchXid == null) {throw new SQLException("should NOT rollback on an inactive session");}try {// Branch Report to TCDefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null);} catch (TransactionException te) {// log and ignore the report failureLOGGER.warn("Failed to report XA branch rollback on " + xid + "-" + xaBranchXid.getBranchId()+ " since " + te.getCode() + ":" + te.getMessage());}try {// XA End: FailxaResource.end(xaBranchXid, XAResource.TMFAIL);} catch (XAException xe) {throw new SQLException("Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe.getMessage(), xe);} finally {cleanXABranchContext();}
}

3.3 TM向TC发送请求,通知进行Global Commit/Rollback

这个过程和AT模式完全相同。

3.4 TC向各个分支发送Branch Commit/Rollback请求

这个过程和AT模式完全相同。

3.5 RM进行Branch Commit/Rollback

AbstractRMHandler收到请求后,最终交给ResourceManagerXA处理,具体的Commit和Rollback交个xaCommit、xaRollback完成,他们直接触发xaResource的commit、rollback方法。

public void xaCommit(String xid, long branchId, String applicationData) throws XAException {XAXid xaXid = XAXidBuilder.build(xid, branchId);xaResource.commit(xaXid, false);releaseIfNecessary();
}public void xaRollback(String xid, long branchId, String applicationData) throws XAException {XAXid xaXid = XAXidBuilder.build(xid, branchId);xaResource.rollback(xaXid);releaseIfNecessary();
}

3.6 异常补偿

异常补偿流程和AT模式完全相同。

4 XAResource

从上面源码层面可以看到,Seata的XA模式是基于jdk的XAResource接口实现的。以下是jdk文档对此接口的描述。

4.1 XAResource简介

XAResource 接口是基于X/Open CAE规范(分布式事务处理:XA 规范)的工业标准XA接口的Java映射。

在分布式事务处理 (DTP) 环境中,XA接口定义资源管理器和事务管理器之间的协定。JDBC驱动程序或JMS提供者实现此接口,以支持全局事务与数据库或消息服务连接之间的关联。

可由应用程序在外部事务管理器控制事务的环境中使用的任何事务资源均可支持XAResource接口。数据库管理系统就属于此类资源。应用程序可以通过多个数据库连接访问数据。通过事务管理器将每个数据库连接作为事务资源添加到列表中。事务管理器为参与全局事务的每个连接获取XAResource。事务管理器使用start方法建立全局事务与资源之间的关联,而使用end方法取消事务与资源之间的关联。资源管理器负责将全局事务关联到在start与end方法调用之间对其数据执行的所有工作。

在事务提交时,事务管理器通知资源管理器根据二阶段提交协议准备、提交或回滚事务。

4.2 方法摘要

返回结果 方法签名 描述
void commit(Xid xid, boolean onePhase) 提交 xid 指定的全局事务。
void end(Xid xid, int flags) 终止代表事务分支所执行的工作。
void forget(Xid xid) 告知资源管理器忽略以启发式完成的事务分支。
int getTransactionTimeout() 获取为此 XAResource 实例设置的当前事务超时值。
boolean isSameRM(XAResource xares) 调用此方法,以确定目标对象表示的资源管理器实例是否与参数xares表示的资源管理器实例相同。
int prepare(Xid xid) 请求资源管理器准备好 xid 中指定的事务的事务提交工作。
Xid[] recover(int flag) 从资源管理器获取准备的事务分支的列表。
void rollback(Xid xid) 通知资源管理器回滚代表事务分支执行的工作。
boolean setTransactionTimeout(int seconds) 为此 XAResource 实例设置当前事务超时值。
void start(Xid xid, int flags) 代表 xid 中指定的事务分支开始工作。

4.3 字段摘要

类型 属性 描述
static int TMENDRSCAN 终止恢复扫描。
static int TMFAIL 取消关联调用者,并将事务分支标记为只回滚。
static int TMJOIN 调用者正连接现有事务分支。
static int TMNOFLAGS 使用 TMNOFLAGS 指示不选择任何标志值。
static int TMONEPHASE 调用者正在使用一阶段优化。
static int TMRESUME 调用者正在恢复与挂起的事务分支的关联。
static int TMSTARTRSCAN 启动恢复扫描。
static int TMSUCCESS 取消调用者与事务分支的关联。
static int TMSUSPEND 调用者正挂起(不是终止)其与事务分支的关联。
static int XA_OK 事务工作正常准备就绪。
static int XA_RDONLY 事务分支是只读的,并且已提交。

5 参考文档

Seata XA 模式

Seata 原理

Seata-AT模式 原理

Seata-TCC模式 原理

Seata-Saga模式 原理

Seata-XA模式 原理

TCC-Transaction原理

Seata-XA模式 原理相关推荐

  1. 还不会分布式事务,seata xa模式入门实战送上

    文章目录 前言 一.什么是seata? 二.seata原理说明 1.角色说明 2.什么是 Seata 的事务模式? 三.SEATA 的分布式案例 1.业务逻辑说明 2.架构图 3.SEATA 的分布式 ...

  2. Seata XA 模式示例分析

    文章目录 1 下载示例 2 示例结构 3 业务服务 business-xa 3.1 模块结构 3.2 Controller 层 3.3 Service 层 3.4 stock Feign 客户端 3. ...

  3. Seata XA 模式理论学习、使用及注意事项 | Spring Cloud54

    一.前言 通过以下系列章节: docker-compose 实现Seata Server高可用部署 | Spring Cloud 51 Seata AT 模式理论学习.事务隔离及部分源码解析 | Sp ...

  4. 阿里分布式事务框架Seata,AT模式原理解析

    什么是分布式事务 如今在分布式技术盛行下,许多公司都已经在使用分布式技术了,虽然分布式技术给我们项目带来了三高(高可用,高扩展,高性能)等优点,但是缺点也很明显,分布式项目一般都是分服务开发,且多个服 ...

  5. 6 张图带你彻底搞懂分布式事务 XA 模式

    作者 | 朱晋君 来源 | 阿里巴巴云原生公众号 XA 协议是由 X/Open 组织提出的分布式事务处理规范,主要定义了事务管理器 TM 和局部资源管理器 RM 之间的接口.目前主流的数据库,比如 o ...

  6. 【Seata】Seata AT和XA模式联系和区别

    文章目录 概述 一.分布式事务产生得原因: 1.1.数据库分库分表 1.2应用SOA化 二.分布式事务解决方案XA模式 三.Seata AT(TXC) 模式 3.1基本概念 3.2AT模式工作流程 3 ...

  7. xa 全局锁_分布式事务如何实现?深入解读 Seata 的 XA 模式

    原标题:分布式事务如何实现?深入解读 Seata 的 XA 模式 作者简介:煊檍,GitHub ID:sharajava,阿里巴巴中件间 GTS 研发团队负责人,SEATA 开源项目发起人,曾在 Or ...

  8. Seata TCC、Saga、XA模式初识

    TCC模式 TCC(Try Confirm Cancel)同样也是两阶段提交: 一阶段prepare行为 两阶段commit或rollback行为 和AT模式不同之处在于解决了AT基于支持本地ACID ...

  9. 深入解读 Seata 的 XA 模式

    Seata 1.2.0 版本重磅发布新的事务模式:XA 模式,实现对 XA 协议的支持. 这里,我们从三个方面来深入解读这个新的特性: 1.是什么(What):XA 模式是什么? 2.为什么(Why) ...

最新文章

  1. 零起点学算法11——求梯形面积
  2. memcache和memcached的区别
  3. gson生成jsonobject_GSON形式生成和解析json数据
  4. 游戏性能优化技术干货分享——内存管理
  5. 反射练习之越过泛型检查
  6. Postman的console视图
  7. 【英语学习】【WOTD】gormless 释义/词源/示例
  8. CSS框架学习资料汇总
  9. 人工智能基础——2.3.2产生式系统
  10. 人工智能对智能建筑有哪些影响,智能建筑发展存在哪些问题?
  11. Docker 中 latest 标签引发的困惑
  12. 网易 UI 自动化工具 Airtest
  13. Restful风格的springMVC配搭ajax请求的小例子
  14. 达梦数据文件误删了恢复
  15. 国外数据平台统计分析sdk
  16. 【shell】笔记|去重复行|删除匹配行|反选删除|反向显示|加减乘除
  17. 【Unity】获取免费可商用的中文像素字体
  18. 新手Python爬虫教学(Request+BeautifulSoup)
  19. android获取各种路径的方法
  20. xeLaTex调用系统字体

热门文章

  1. 湖南大学2022年计算机考研复试时间
  2. python中documentelement_python网络编程学习笔记(八):XML生成与解析(DOM、ElementTree)...
  3. 《高效时间管理》--司铭宇老师
  4. 前端那些好看的视觉效果集合
  5. 东财《劳动法X》综合作业
  6. 如何快速识别对方的沟通类型?
  7. arduino 时间灯控
  8. Scala基础教程--18--集合(二)
  9. CSV逗号分隔文件打开汉字乱码解决方法
  10. eventhandler java_用 EventHandler 进行事件监听