Seata-XA模式 原理
1 XA模式示例
示例参考:github上seata-sample
业务代码层面和xa完全相同,仅数据库代理层面替换成DataSourceProxyXA即可,更多内容,参考示例。
2 架构
注:此图来自seata官网。
3 源码分析
3.1 TM开启全局事务
此过程和AT模式一样,使用@GlobalTransactional即可。
3.2 RM执行分支事务
因为DataSource使用了代理,所以所有DB操作均交个DataSourceProxyXA完成,当执行db操作时,请求将会由ExecuteTemplateXA执行。
3.2.1 执行sql:ExecuteTemplateXA#execute
在以下代码中我们可以看到,其主要逻辑如下:
- 记录下事务提交方式,自动提交 or 非自动提交
- 事务提交方式改为非自动提交,见:connectionProxyXA.setAutoCommit方法。这一步将向TC注册分支事务。
- 执行DB操作。
- 如果执行失败了,那么rollback,见:connectionProxyXA.rollback()
- 如果开始时事务为自动提交,那么commit,见:connectionProxyXA.commit()
public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA, StatementCallback<T, S> statementCallback, S targetStatement, Object... args) throws SQLException {boolean autoCommitStatus = connectionProxyXA.getAutoCommit();if (autoCommitStatus) {// XA StartconnectionProxyXA.setAutoCommit(false);}try {T res = null;try {// execute SQLres = statementCallback.execute(targetStatement, args);} catch (Throwable ex) {if (autoCommitStatus) {// XA End & Rollbacktry {connectionProxyXA.rollback();} catch (SQLException sqle) {// log and ignore the rollback failure.LOGGER.warn("Failed to rollback xa branch of " + connectionProxyXA.xid +"(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(),sqle);}}if (ex instanceof SQLException) {throw ex;} else {throw new SQLException(ex);}}if (autoCommitStatus) {try {// XA End & PrepareconnectionProxyXA.commit();} catch (Throwable ex) {LOGGER.warn("Failed to commit xa branch of " + connectionProxyXA.xid + ") since " + ex.getMessage(),ex);// XA End & Rollbackif (!(ex instanceof SQLException) || !AbstractConnectionProxyXA.SQLSTATE_XA_NOT_END.equalsIgnoreCase(((SQLException) ex).getSQLState())) {try {connectionProxyXA.rollback();} catch (SQLException sqle) {// log and ignore the rollback failure.LOGGER.warn("Failed to rollback xa branch of " + connectionProxyXA.xid +"(caused by commit failure(" + ex.getMessage() + ") since " + sqle.getMessage(),sqle);}}if (ex instanceof SQLException) {throw ex;} else {throw new SQLException(ex);}}}return res;} finally {if (autoCommitStatus) {connectionProxyXA.setAutoCommit(true);}}
}
3.2.2 设置自动提交方式:ConnectionProxyXA#setAutoCommit
- 向TC注册分支事务。
- 将当前分支事务id和xaResource绑定起来,即:xaResource.start
public void setAutoCommit(boolean autoCommit) throws SQLException {if (currentAutoCommitStatus == autoCommit) {return;}if (autoCommit) {// According to JDBC spec:// If this method is called during a transaction and the// auto-commit mode is changed, the transaction is committed.if (xaActive) {commit();}} else {if (xaActive) {throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");}// Start a XA branchlong branchId = 0L;try {// 1. register branch to TC then get the branchIdbranchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,null);} catch (TransactionException te) {cleanXABranchContext();throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);}// 2. build XA-Xid with xid and branchIdthis.xaBranchXid = XAXidBuilder.build(xid, branchId);try {// 3. XA StartxaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);} catch (XAException e) {cleanXABranchContext();throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);}// 4. XA is activethis.xaActive = true;}currentAutoCommitStatus = autoCommit;
}
3.2.3 设置自动提交方式:ConnectionProxyXA#commit
- 终止xaBranchXid分支所执行的工作,即:xaResource.end
- 请求资源管理器准备好xaBranchXid事务的提交工作,即:xaResource.prepare
public void commit() throws SQLException {if (currentAutoCommitStatus) {// Ignore the committing on an autocommit session.return;}if (!xaActive || this.xaBranchXid == null) {throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);}try {// XA End: SuccessxaResource.end(xaBranchXid, XAResource.TMSUCCESS);// XA PreparexaResource.prepare(xaBranchXid);// Keep the Connection if necessarykeepIfNecessary();} catch (XAException xe) {try {// Branch Report to TC: FailedDefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),BranchStatus.PhaseOne_Failed, null);} catch (TransactionException te) {LOGGER.warn("Failed to report XA branch commit-failure on " + xid + "-" + xaBranchXid.getBranchId()+ " since " + te.getCode() + ":" + te.getMessage() + " and XAException:" + xe.getMessage());}throw new SQLException("Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe.getMessage(), xe);} finally {cleanXABranchContext();}
}
3.2.4 设置自动提交方式:ConnectionProxyXA#rollback
- 向TC汇报事务执行失败。
- 终止xaBranchXid分支所执行的工作,即:xaResource.end
public void rollback() throws SQLException {if (currentAutoCommitStatus) {// Ignore the committing on an autocommit session.return;}if (!xaActive || this.xaBranchXid == null) {throw new SQLException("should NOT rollback on an inactive session");}try {// Branch Report to TCDefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null);} catch (TransactionException te) {// log and ignore the report failureLOGGER.warn("Failed to report XA branch rollback on " + xid + "-" + xaBranchXid.getBranchId()+ " since " + te.getCode() + ":" + te.getMessage());}try {// XA End: FailxaResource.end(xaBranchXid, XAResource.TMFAIL);} catch (XAException xe) {throw new SQLException("Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe.getMessage(), xe);} finally {cleanXABranchContext();}
}
3.3 TM向TC发送请求,通知进行Global Commit/Rollback
这个过程和AT模式完全相同。
3.4 TC向各个分支发送Branch Commit/Rollback请求
这个过程和AT模式完全相同。
3.5 RM进行Branch Commit/Rollback
AbstractRMHandler收到请求后,最终交给ResourceManagerXA处理,具体的Commit和Rollback交个xaCommit、xaRollback完成,他们直接触发xaResource的commit、rollback方法。
public void xaCommit(String xid, long branchId, String applicationData) throws XAException {XAXid xaXid = XAXidBuilder.build(xid, branchId);xaResource.commit(xaXid, false);releaseIfNecessary();
}public void xaRollback(String xid, long branchId, String applicationData) throws XAException {XAXid xaXid = XAXidBuilder.build(xid, branchId);xaResource.rollback(xaXid);releaseIfNecessary();
}
3.6 异常补偿
异常补偿流程和AT模式完全相同。
4 XAResource
从上面源码层面可以看到,Seata的XA模式是基于jdk的XAResource接口实现的。以下是jdk文档对此接口的描述。
4.1 XAResource简介
XAResource 接口是基于X/Open CAE规范(分布式事务处理:XA 规范)的工业标准XA接口的Java映射。
在分布式事务处理 (DTP) 环境中,XA接口定义资源管理器和事务管理器之间的协定。JDBC驱动程序或JMS提供者实现此接口,以支持全局事务与数据库或消息服务连接之间的关联。
可由应用程序在外部事务管理器控制事务的环境中使用的任何事务资源均可支持XAResource接口。数据库管理系统就属于此类资源。应用程序可以通过多个数据库连接访问数据。通过事务管理器将每个数据库连接作为事务资源添加到列表中。事务管理器为参与全局事务的每个连接获取XAResource。事务管理器使用start方法建立全局事务与资源之间的关联,而使用end方法取消事务与资源之间的关联。资源管理器负责将全局事务关联到在start与end方法调用之间对其数据执行的所有工作。
在事务提交时,事务管理器通知资源管理器根据二阶段提交协议准备、提交或回滚事务。
4.2 方法摘要
返回结果 | 方法签名 | 描述 |
---|---|---|
void | commit(Xid xid, boolean onePhase) | 提交 xid 指定的全局事务。 |
void | end(Xid xid, int flags) | 终止代表事务分支所执行的工作。 |
void | forget(Xid xid) | 告知资源管理器忽略以启发式完成的事务分支。 |
int | getTransactionTimeout() | 获取为此 XAResource 实例设置的当前事务超时值。 |
boolean | isSameRM(XAResource xares) | 调用此方法,以确定目标对象表示的资源管理器实例是否与参数xares表示的资源管理器实例相同。 |
int | prepare(Xid xid) | 请求资源管理器准备好 xid 中指定的事务的事务提交工作。 |
Xid[] | recover(int flag) | 从资源管理器获取准备的事务分支的列表。 |
void | rollback(Xid xid) | 通知资源管理器回滚代表事务分支执行的工作。 |
boolean | setTransactionTimeout(int seconds) | 为此 XAResource 实例设置当前事务超时值。 |
void | start(Xid xid, int flags) | 代表 xid 中指定的事务分支开始工作。 |
4.3 字段摘要
类型 | 属性 | 描述 |
---|---|---|
static int | TMENDRSCAN | 终止恢复扫描。 |
static int | TMFAIL | 取消关联调用者,并将事务分支标记为只回滚。 |
static int | TMJOIN | 调用者正连接现有事务分支。 |
static int | TMNOFLAGS | 使用 TMNOFLAGS 指示不选择任何标志值。 |
static int | TMONEPHASE | 调用者正在使用一阶段优化。 |
static int | TMRESUME | 调用者正在恢复与挂起的事务分支的关联。 |
static int | TMSTARTRSCAN | 启动恢复扫描。 |
static int | TMSUCCESS | 取消调用者与事务分支的关联。 |
static int | TMSUSPEND | 调用者正挂起(不是终止)其与事务分支的关联。 |
static int | XA_OK | 事务工作正常准备就绪。 |
static int | XA_RDONLY | 事务分支是只读的,并且已提交。 |
5 参考文档
Seata XA 模式
Seata 原理
Seata-AT模式 原理
Seata-TCC模式 原理
Seata-Saga模式 原理
Seata-XA模式 原理
TCC-Transaction原理
Seata-XA模式 原理相关推荐
- 还不会分布式事务,seata xa模式入门实战送上
文章目录 前言 一.什么是seata? 二.seata原理说明 1.角色说明 2.什么是 Seata 的事务模式? 三.SEATA 的分布式案例 1.业务逻辑说明 2.架构图 3.SEATA 的分布式 ...
- Seata XA 模式示例分析
文章目录 1 下载示例 2 示例结构 3 业务服务 business-xa 3.1 模块结构 3.2 Controller 层 3.3 Service 层 3.4 stock Feign 客户端 3. ...
- Seata XA 模式理论学习、使用及注意事项 | Spring Cloud54
一.前言 通过以下系列章节: docker-compose 实现Seata Server高可用部署 | Spring Cloud 51 Seata AT 模式理论学习.事务隔离及部分源码解析 | Sp ...
- 阿里分布式事务框架Seata,AT模式原理解析
什么是分布式事务 如今在分布式技术盛行下,许多公司都已经在使用分布式技术了,虽然分布式技术给我们项目带来了三高(高可用,高扩展,高性能)等优点,但是缺点也很明显,分布式项目一般都是分服务开发,且多个服 ...
- 6 张图带你彻底搞懂分布式事务 XA 模式
作者 | 朱晋君 来源 | 阿里巴巴云原生公众号 XA 协议是由 X/Open 组织提出的分布式事务处理规范,主要定义了事务管理器 TM 和局部资源管理器 RM 之间的接口.目前主流的数据库,比如 o ...
- 【Seata】Seata AT和XA模式联系和区别
文章目录 概述 一.分布式事务产生得原因: 1.1.数据库分库分表 1.2应用SOA化 二.分布式事务解决方案XA模式 三.Seata AT(TXC) 模式 3.1基本概念 3.2AT模式工作流程 3 ...
- xa 全局锁_分布式事务如何实现?深入解读 Seata 的 XA 模式
原标题:分布式事务如何实现?深入解读 Seata 的 XA 模式 作者简介:煊檍,GitHub ID:sharajava,阿里巴巴中件间 GTS 研发团队负责人,SEATA 开源项目发起人,曾在 Or ...
- Seata TCC、Saga、XA模式初识
TCC模式 TCC(Try Confirm Cancel)同样也是两阶段提交: 一阶段prepare行为 两阶段commit或rollback行为 和AT模式不同之处在于解决了AT基于支持本地ACID ...
- 深入解读 Seata 的 XA 模式
Seata 1.2.0 版本重磅发布新的事务模式:XA 模式,实现对 XA 协议的支持. 这里,我们从三个方面来深入解读这个新的特性: 1.是什么(What):XA 模式是什么? 2.为什么(Why) ...
最新文章
- 零起点学算法11——求梯形面积
- memcache和memcached的区别
- gson生成jsonobject_GSON形式生成和解析json数据
- 游戏性能优化技术干货分享——内存管理
- 反射练习之越过泛型检查
- Postman的console视图
- 【英语学习】【WOTD】gormless 释义/词源/示例
- CSS框架学习资料汇总
- 人工智能基础——2.3.2产生式系统
- 人工智能对智能建筑有哪些影响,智能建筑发展存在哪些问题?
- Docker 中 latest 标签引发的困惑
- 网易 UI 自动化工具 Airtest
- Restful风格的springMVC配搭ajax请求的小例子
- 达梦数据文件误删了恢复
- 国外数据平台统计分析sdk
- 【shell】笔记|去重复行|删除匹配行|反选删除|反向显示|加减乘除
- 【Unity】获取免费可商用的中文像素字体
- 新手Python爬虫教学(Request+BeautifulSoup)
- android获取各种路径的方法
- xeLaTex调用系统字体