Seata源码分析之TransactionManager(一)
目录
一、用户开启事务示例
1.GlobalTransaction的api方式
2.@GlobalTransaction注解方式
二、TransactionManager
三、DefaultTransactionManager
四、TransactionManagerHolder
五、GlobalTransaction
六、DefaultGlobalTransaction
七、GlobalTransactionContext
一、用户开启事务示例
seata提供了两种方式供用户开启分布式事务管理,一种是具有代码嵌入的api方式,另一种则是常用的注解方式
1.GlobalTransaction的api方式
public static void main(String[] args) throws SQLException, TransactionException, InterruptedException {String userId = "U100001";String commodityCode = "C00321";int commodityCount = 100;int money = 999;AccountService accountService = new AccountServiceImpl();StorageService storageService = new StorageServiceImpl();OrderService orderService = new OrderServiceImpl();orderService.setAccountService(accountService);//reset dataaccountService.reset(userId, String.valueOf(money));storageService.reset(commodityCode, String.valueOf(commodityCount));orderService.reset(null, null);//init seata; only onceString applicationId = "api";String txServiceGroup = "my_test_tx_group";TMClient.init(applicationId, txServiceGroup);RMClient.init(applicationId, txServiceGroup);//trxGlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();try {tx.begin(60000, "testBiz");System.out.println("begin trx, xid is " + tx.getXid());//biz operate 3 dataSources//set >=5 will be rollback(200*5>999) else will be commitint opCount = 5;storageService.deduct(commodityCode, opCount);orderService.create(userId, commodityCode, opCount);//check data if negativeboolean needCommit = ((StorageServiceImpl)storageService).validNegativeCheck("count", commodityCode)&& ((AccountServiceImpl)accountService).validNegativeCheck("money", userId);//if data negative rollback else commitif (needCommit) {tx.commit();} else {System.out.println("rollback trx, cause: data negative, xid is " + tx.getXid());tx.rollback();}} catch (Exception exx) {System.out.println("rollback trx, cause: " + exx.getMessage() + " , xid is " + tx.getXid());tx.rollback();throw exx;}TimeUnit.SECONDS.sleep(10);}
2.@GlobalTransaction注解方式
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")public void purchase(String userId, String commodityCode, int orderCount) {LOGGER.info("purchase begin ... xid: " + RootContext.getXID());storageService.deduct(commodityCode, orderCount);orderService.create(userId, commodityCode, orderCount);throw new RuntimeException("xxx");}
下面我们分析它是如何通过TransactionManager和GlobalTransaction实现的
二、TransactionManager
TransactionManager接口提供4个方法,开启全球事务,提交全球事务,回滚全球事务和获取当前事务的状态。
public interface TransactionManager {String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException;GlobalStatus commit(String xid) throws TransactionException;GlobalStatus rollback(String xid) throws TransactionException;GlobalStatus getStatus(String xid) throws TransactionException;
}
三、DefaultTransactionManager
DefaultTransactionManager实现TransactionManager接口,4个方法分别创建对应的请求对象,调用TmRpcClient客户端使用netty连接将数据传给TC。
public class DefaultTransactionManager implements TransactionManager {@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);return response.getXid();}@Overridepublic GlobalStatus commit(String xid) throws TransactionException {GlobalCommitRequest globalCommit = new GlobalCommitRequest();globalCommit.setXid(xid);GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);return response.getGlobalStatus();}@Overridepublic GlobalStatus rollback(String xid) throws TransactionException {GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);return response.getGlobalStatus();}@Overridepublic GlobalStatus getStatus(String xid) throws TransactionException {GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();queryGlobalStatus.setXid(xid);GlobalStatusResponse response = (GlobalStatusResponse)syncCall(queryGlobalStatus);return response.getGlobalStatus();}private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);} catch (TimeoutException toe) {throw new TransactionException(TransactionExceptionCode.IO, toe);}}
}
四、TransactionManagerHolder
TransactionManagerHolder为创建单例TransactionManager的工厂,可以使用EnhancedServiceLoader的spi机制加载用户自定义的类,默认为DefaultTransactionManager。
public class TransactionManagerHolder {private static class SingletonHolder {private static TransactionManager INSTANCE = null;static {try {INSTANCE = EnhancedServiceLoader.load(TransactionManager.class);LOGGER.info("TransactionManager Singleton " + INSTANCE);} catch (Throwable anyEx) {LOGGER.error("Failed to load TransactionManager Singleton! ", anyEx);}}}public static TransactionManager get() {if (SingletonHolder.INSTANCE == null) {throw new ShouldNeverHappenException("TransactionManager is NOT ready!");}return SingletonHolder.INSTANCE;}private TransactionManagerHolder() {}
}
五、GlobalTransaction
GlobalTransaction接口提供给用户开启事务(超时时间,全局事务名称),提交,回滚,获取状态方法。
public interface GlobalTransaction {void begin() throws TransactionException;void begin(int timeout) throws TransactionException;void begin(int timeout, String name) throws TransactionException;void commit() throws TransactionException;void rollback() throws TransactionException;GlobalStatus getStatus() throws TransactionException;String getXid();
}
六、DefaultGlobalTransaction
DefaultGlobalTransaction是GlobalTransaction接口的默认实现,它持有TransactionManager对象,默认开启事务超时时间为60秒,默认名称为default,因为调用者的业务方法可能多重嵌套创建多个GlobalTransaction对象开启事务方法,因此
GlobalTransaction有GlobalTransactionRole角色属性,只有Launcher角色的才有开启、提交、回滚事务的权利。
public class DefaultGlobalTransaction implements GlobalTransaction {private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;private static final String DEFAULT_GLOBAL_TX_NAME = "default";private TransactionManager transactionManager;private String xid;private GlobalStatus status;private GlobalTransactionRole role;DefaultGlobalTransaction() {this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);}DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {this.transactionManager = TransactionManagerHolder.get();this.xid = xid;this.status = status;this.role = role;}@Overridepublic void begin(int timeout, String name) throws TransactionException {// Launcher角色才可以开启if (role != GlobalTransactionRole.Launcher) {check();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");}return;}if (xid != null) {throw new IllegalStateException();}if (RootContext.getXID() != null) {throw new IllegalStateException();}// 委派给transactionManager开启事务xid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [" + xid + "]");}}@Overridepublic void commit() throws TransactionException {// Launcher角色才可以提交if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [" + xid + "]");}return;}if (xid == null) {throw new IllegalStateException();}status = transactionManager.commit(xid);if (RootContext.getXID() != null) {if (xid.equals(RootContext.getXID())) {RootContext.unbind();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[" + xid + "] commit status:" + status);}}@Overridepublic void rollback() throws TransactionException {// Launcher角色才可以回滚if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Rollback(): just involved in global transaction [" + xid + "]");}return;}if (xid == null) {throw new IllegalStateException();}status = transactionManager.rollback(xid);if (RootContext.getXID() != null) {if (xid.equals(RootContext.getXID())) {RootContext.unbind();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[" + xid + "] rollback status:" + status);}}@Overridepublic GlobalStatus getStatus() throws TransactionException {if (xid == null) {return GlobalStatus.UnKnown;}status = transactionManager.getStatus(xid);return status;}@Overridepublic String getXid() {return xid;}// Participant角色assert,当前线程应该有xid信息。private void check() {if (xid == null) {throw new ShouldNeverHappenException();}}
}
七、GlobalTransactionContext
GlobalTransactionContext为操作GlobalTransaction的工具类,提供创建新的GlobalTransaction,获取当前线程有的GlobalTransaction等方法。
public class GlobalTransactionContext {private GlobalTransactionContext() {}// 创建新的DefaultGlobalTransactionprivate static GlobalTransaction createNew() {GlobalTransaction tx = new DefaultGlobalTransaction();return tx;}// 获取绑定到当前线程的GlobalTransaction实例,如果当前线程没有,返回nullprivate static GlobalTransaction getCurrent() {String xid = RootContext.getXID();if (xid == null) {return null;}return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);}// 获取绑定到当前线程的GlobalTransaction实例,如果当前线程没有,创建实例public static GlobalTransaction getCurrentOrCreate() {GlobalTransaction tx = getCurrent();if (tx == null) {return createNew();}return tx;}// 通过给的xid,重新加载GlobalTransactionpublic static GlobalTransaction reload(String xid) throws TransactionException {GlobalTransaction tx = new DefaultGlobalTransaction(xid, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher) {@Overridepublic void begin(int timeout, String name) throws TransactionException {throw new IllegalStateException("Never BEGIN on a RELOADED GlobalTransaction. ");}};return tx;}
}
Seata源码分析之TransactionManager(一)相关推荐
- Seata 源码分析 - tm、rm 中 xid 传递过程
一.Seata 前面文章讲解了对 Seata 的 AT 和 TCC 模式的使用,本篇文章为大家讲解下 Seata 中 TM.RM 中 xid 传递过程,如果不了解 Seata 中的 xid,可以理解为 ...
- 阿里开源一站式分布式事务框架seata源码分析(AT模式下TM与RM分析)
序言: 对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍.本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo). seata中一个事 ...
- seata源码分析之全局事务的开启跟xid的传递
概览 首先我们通过@GlobalTransactional这个注解开启一个全局事务,而GlobalTransactionScanner.wrapIfNecessary()会为所有方法上加了这个注解的b ...
- mybatis源码分析之事务管理器
2019独角兽企业重金招聘Python工程师标准>>> 上一篇:mybatis源码分析之Configuration 主要分析了构建SqlSessionFactory的过程中配置文件的 ...
- mvc设计模式现在过时了吗_尚学堂115——设计模式、源码分析以及SpringData
设计模式 什么是设计模式?你是否在你的代码里面使用过任何设计模式? 设计模式是在软件设计中常见问题的通用.可反复使用.多数人知晓的一种解决方案或模板:这些解决方案是在相当长的一段时间内由众多软件开发人 ...
- MyBatis 源码分析系列文章导读
1.本文速览 本篇文章是我为接下来的 MyBatis 源码分析系列文章写的一个导读文章.本篇文章从 MyBatis 是什么(what),为什么要使用(why),以及如何使用(how)等三个角度进行了说 ...
- 源码 状态机_阿里中间件seata源码剖析七:saga模式实现
saga模式是分布式事务中使用比较多的一种模式,他主要应用在长流程的服务,对一个全局事务,如果某个节点抛出了异常,则从这个节点往前依次回滚或补偿事务.今天我们就来看看它的源码实现. 状态机初始化 在之 ...
- 从源码分析 Spring 基于注解的事务
从源码分析 Spring 基于注解的事务 在spring引入基于注解的事务(@Transactional)之前,我们一般都是如下这样进行拦截事务的配置: <!-- 拦截器方式配置事务 --> ...
- Mybatis 源码分析(一)配置文件加载流程
Mybatis 源码分析(一)配置文件加载流程 1.项目构建 引入依赖 <dependency><groupId>org.mybatis</groupId>< ...
最新文章
- Spark的transformation和action算子简介
- 尚学python课程---11、linux环境下安装python注意
- 表单中的只读和禁用属性
- npm安装和Vue运行
- 服务器虚拟化 需求,虚拟化的优势与需求分析
- 火柴人小程序linux,火柴人你瞅啥小程序
- Java案例:接口的静态方法
- ubuntu13.04中把ibus中的中文拼音输入设为默认
- HDU 5975 2016ICPC大连 E: Aninteresting game(树状数组原理)
- 模板题——贪心(2)
- Survivor空间溢出实例
- w ndows10更新后变成32位了,三种直接从 Win7 升级到 Win10 的方法
- 手机PDF文档如何解密去除不能编辑的限制?
- 如何用计算机对cad的草图,cad文件导入草图大师的方法步骤
- PDF书签制作的方法!
- linux操作系统未来的发展方向,2.9 操作系统的未来发展趋势
- CSS属性vertical-align详解(CSS之五)
- 学渣的刷题之旅 leetcode刷题 9. 回文数
- android java join_java中的join用法
- 华为大数据云issues