目录

一、用户开启事务示例

1.GlobalTransaction的api方式

2.@GlobalTransaction注解方式

二、TransactionManager

三、DefaultTransactionManager

四、TransactionManagerHolder

五、GlobalTransaction

六、DefaultGlobalTransaction

七、GlobalTransactionContext


一、用户开启事务示例

seata提供了两种方式供用户开启分布式事务管理,一种是具有代码嵌入的api方式,另一种则是常用的注解方式

1.GlobalTransaction的api方式

    public static void main(String[] args) throws SQLException, TransactionException, InterruptedException {String userId = "U100001";String commodityCode = "C00321";int commodityCount = 100;int money = 999;AccountService accountService = new AccountServiceImpl();StorageService storageService = new StorageServiceImpl();OrderService orderService = new OrderServiceImpl();orderService.setAccountService(accountService);//reset dataaccountService.reset(userId, String.valueOf(money));storageService.reset(commodityCode, String.valueOf(commodityCount));orderService.reset(null, null);//init seata; only onceString applicationId = "api";String txServiceGroup = "my_test_tx_group";TMClient.init(applicationId, txServiceGroup);RMClient.init(applicationId, txServiceGroup);//trxGlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();try {tx.begin(60000, "testBiz");System.out.println("begin trx, xid is " + tx.getXid());//biz operate 3 dataSources//set >=5 will be rollback(200*5>999) else will be commitint opCount = 5;storageService.deduct(commodityCode, opCount);orderService.create(userId, commodityCode, opCount);//check data if negativeboolean needCommit = ((StorageServiceImpl)storageService).validNegativeCheck("count", commodityCode)&& ((AccountServiceImpl)accountService).validNegativeCheck("money", userId);//if data negative rollback else commitif (needCommit) {tx.commit();} else {System.out.println("rollback trx, cause: data negative, xid is " + tx.getXid());tx.rollback();}} catch (Exception exx) {System.out.println("rollback trx, cause: " + exx.getMessage() + " , xid is " + tx.getXid());tx.rollback();throw exx;}TimeUnit.SECONDS.sleep(10);}

2.@GlobalTransaction注解方式

    @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")public void purchase(String userId, String commodityCode, int orderCount) {LOGGER.info("purchase begin ... xid: " + RootContext.getXID());storageService.deduct(commodityCode, orderCount);orderService.create(userId, commodityCode, orderCount);throw new RuntimeException("xxx");}

下面我们分析它是如何通过TransactionManager和GlobalTransaction实现的

二、TransactionManager

TransactionManager接口提供4个方法,开启全球事务,提交全球事务,回滚全球事务和获取当前事务的状态。

public interface TransactionManager {String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException;GlobalStatus commit(String xid) throws TransactionException;GlobalStatus rollback(String xid) throws TransactionException;GlobalStatus getStatus(String xid) throws TransactionException;
}

三、DefaultTransactionManager

DefaultTransactionManager实现TransactionManager接口,4个方法分别创建对应的请求对象,调用TmRpcClient客户端使用netty连接将数据传给TC。

public class DefaultTransactionManager implements TransactionManager {@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);return response.getXid();}@Overridepublic GlobalStatus commit(String xid) throws TransactionException {GlobalCommitRequest globalCommit = new GlobalCommitRequest();globalCommit.setXid(xid);GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);return response.getGlobalStatus();}@Overridepublic GlobalStatus rollback(String xid) throws TransactionException {GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);return response.getGlobalStatus();}@Overridepublic GlobalStatus getStatus(String xid) throws TransactionException {GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();queryGlobalStatus.setXid(xid);GlobalStatusResponse response = (GlobalStatusResponse)syncCall(queryGlobalStatus);return response.getGlobalStatus();}private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);} catch (TimeoutException toe) {throw new TransactionException(TransactionExceptionCode.IO, toe);}}
}

四、TransactionManagerHolder

TransactionManagerHolder为创建单例TransactionManager的工厂,可以使用EnhancedServiceLoader的spi机制加载用户自定义的类,默认为DefaultTransactionManager。

public class TransactionManagerHolder {private static class SingletonHolder {private static TransactionManager INSTANCE = null;static {try {INSTANCE = EnhancedServiceLoader.load(TransactionManager.class);LOGGER.info("TransactionManager Singleton " + INSTANCE);} catch (Throwable anyEx) {LOGGER.error("Failed to load TransactionManager Singleton! ", anyEx);}}}public static TransactionManager get() {if (SingletonHolder.INSTANCE == null) {throw new ShouldNeverHappenException("TransactionManager is NOT ready!");}return SingletonHolder.INSTANCE;}private TransactionManagerHolder() {}
}

五、GlobalTransaction

GlobalTransaction接口提供给用户开启事务(超时时间,全局事务名称),提交,回滚,获取状态方法。

public interface GlobalTransaction {void begin() throws TransactionException;void begin(int timeout) throws TransactionException;void begin(int timeout, String name) throws TransactionException;void commit() throws TransactionException;void rollback() throws TransactionException;GlobalStatus getStatus() throws TransactionException;String getXid();
}

六、DefaultGlobalTransaction

DefaultGlobalTransaction是GlobalTransaction接口的默认实现,它持有TransactionManager对象,默认开启事务超时时间为60秒,默认名称为default,因为调用者的业务方法可能多重嵌套创建多个GlobalTransaction对象开启事务方法,因此
GlobalTransaction有GlobalTransactionRole角色属性,只有Launcher角色的才有开启、提交、回滚事务的权利。

public class DefaultGlobalTransaction implements GlobalTransaction {private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;private static final String DEFAULT_GLOBAL_TX_NAME = "default";private TransactionManager transactionManager;private String xid;private GlobalStatus status;private GlobalTransactionRole role;DefaultGlobalTransaction() {this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);}DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {this.transactionManager = TransactionManagerHolder.get();this.xid = xid;this.status = status;this.role = role;}@Overridepublic void begin(int timeout, String name) throws TransactionException {// Launcher角色才可以开启if (role != GlobalTransactionRole.Launcher) {check();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");}return;}if (xid != null) {throw new IllegalStateException();}if (RootContext.getXID() != null) {throw new IllegalStateException();}// 委派给transactionManager开启事务xid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [" + xid + "]");}}@Overridepublic void commit() throws TransactionException {// Launcher角色才可以提交if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [" + xid + "]");}return;}if (xid == null) {throw new IllegalStateException();}status = transactionManager.commit(xid);if (RootContext.getXID() != null) {if (xid.equals(RootContext.getXID())) {RootContext.unbind();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[" + xid + "] commit status:" + status);}}@Overridepublic void rollback() throws TransactionException {// Launcher角色才可以回滚if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Rollback(): just involved in global transaction [" + xid + "]");}return;}if (xid == null) {throw new IllegalStateException();}status = transactionManager.rollback(xid);if (RootContext.getXID() != null) {if (xid.equals(RootContext.getXID())) {RootContext.unbind();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[" + xid + "] rollback status:" + status);}}@Overridepublic GlobalStatus getStatus() throws TransactionException {if (xid == null) {return GlobalStatus.UnKnown;}status = transactionManager.getStatus(xid);return status;}@Overridepublic String getXid() {return xid;}// Participant角色assert,当前线程应该有xid信息。private void check() {if (xid == null) {throw new ShouldNeverHappenException();}}
}

七、GlobalTransactionContext

GlobalTransactionContext为操作GlobalTransaction的工具类,提供创建新的GlobalTransaction,获取当前线程有的GlobalTransaction等方法。

public class GlobalTransactionContext {private GlobalTransactionContext() {}// 创建新的DefaultGlobalTransactionprivate static GlobalTransaction createNew() {GlobalTransaction tx = new DefaultGlobalTransaction();return tx;}// 获取绑定到当前线程的GlobalTransaction实例,如果当前线程没有,返回nullprivate static GlobalTransaction getCurrent() {String xid = RootContext.getXID();if (xid == null) {return null;}return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);}// 获取绑定到当前线程的GlobalTransaction实例,如果当前线程没有,创建实例public static GlobalTransaction getCurrentOrCreate() {GlobalTransaction tx = getCurrent();if (tx == null) {return createNew();}return tx;}// 通过给的xid,重新加载GlobalTransactionpublic static GlobalTransaction reload(String xid) throws TransactionException {GlobalTransaction tx = new DefaultGlobalTransaction(xid, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher) {@Overridepublic void begin(int timeout, String name) throws TransactionException {throw new IllegalStateException("Never BEGIN on a RELOADED GlobalTransaction. ");}};return tx;}
}

Seata源码分析之TransactionManager(一)相关推荐

  1. Seata 源码分析 - tm、rm 中 xid 传递过程

    一.Seata 前面文章讲解了对 Seata 的 AT 和 TCC 模式的使用,本篇文章为大家讲解下 Seata 中 TM.RM 中 xid 传递过程,如果不了解 Seata 中的 xid,可以理解为 ...

  2. 阿里开源一站式分布式事务框架seata源码分析(AT模式下TM与RM分析)

    序言: 对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍.本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo). seata中一个事 ...

  3. seata源码分析之全局事务的开启跟xid的传递

    概览 首先我们通过@GlobalTransactional这个注解开启一个全局事务,而GlobalTransactionScanner.wrapIfNecessary()会为所有方法上加了这个注解的b ...

  4. mybatis源码分析之事务管理器

    2019独角兽企业重金招聘Python工程师标准>>> 上一篇:mybatis源码分析之Configuration 主要分析了构建SqlSessionFactory的过程中配置文件的 ...

  5. mvc设计模式现在过时了吗_尚学堂115——设计模式、源码分析以及SpringData

    设计模式 什么是设计模式?你是否在你的代码里面使用过任何设计模式? 设计模式是在软件设计中常见问题的通用.可反复使用.多数人知晓的一种解决方案或模板:这些解决方案是在相当长的一段时间内由众多软件开发人 ...

  6. MyBatis 源码分析系列文章导读

    1.本文速览 本篇文章是我为接下来的 MyBatis 源码分析系列文章写的一个导读文章.本篇文章从 MyBatis 是什么(what),为什么要使用(why),以及如何使用(how)等三个角度进行了说 ...

  7. 源码 状态机_阿里中间件seata源码剖析七:saga模式实现

    saga模式是分布式事务中使用比较多的一种模式,他主要应用在长流程的服务,对一个全局事务,如果某个节点抛出了异常,则从这个节点往前依次回滚或补偿事务.今天我们就来看看它的源码实现. 状态机初始化 在之 ...

  8. 从源码分析 Spring 基于注解的事务

    从源码分析 Spring 基于注解的事务 在spring引入基于注解的事务(@Transactional)之前,我们一般都是如下这样进行拦截事务的配置: <!-- 拦截器方式配置事务 --> ...

  9. Mybatis 源码分析(一)配置文件加载流程

    Mybatis 源码分析(一)配置文件加载流程 1.项目构建 引入依赖 <dependency><groupId>org.mybatis</groupId>< ...

最新文章

  1. Spark的transformation和action算子简介
  2. 尚学python课程---11、linux环境下安装python注意
  3. 表单中的只读和禁用属性
  4. npm安装和Vue运行
  5. 服务器虚拟化 需求,虚拟化的优势与需求分析
  6. 火柴人小程序linux,火柴人你瞅啥小程序
  7. Java案例:接口的静态方法
  8. ubuntu13.04中把ibus中的中文拼音输入设为默认
  9. HDU 5975 2016ICPC大连 E: Aninteresting game(树状数组原理)
  10. 模板题——贪心(2)
  11. Survivor空间溢出实例
  12. w ndows10更新后变成32位了,三种直接从 Win7 升级到 Win10 的方法
  13. 手机PDF文档如何解密去除不能编辑的限制?
  14. 如何用计算机对cad的草图,cad文件导入草图大师的方法步骤
  15. PDF书签制作的方法!
  16. linux操作系统未来的发展方向,2.9 操作系统的未来发展趋势
  17. CSS属性vertical-align详解(CSS之五)
  18. 学渣的刷题之旅 leetcode刷题 9. 回文数
  19. android java join_java中的join用法
  20. 华为大数据云issues

热门文章

  1. 英特尔酷睿 i9-11900K 首发评测优势分析,新架构Cypress Cove较上代性能提高19%
  2. java截图工具的实现
  3. 华为esight搭建之一(环境的搭建)
  4. ENVI/IDL 编程:批量裁剪同一地区的多幅影像
  5. python 常用镜像源
  6. 怎么找到没有广告的安全浏览器?这款干净实用
  7. hbase几种查询方式对比
  8. 走进阿里:现任阿里技术团队负责人,公开解密阿里新一代核心技术
  9. “书路计划”助力教育扶智,亚马逊发起儿童数字阅读公益生态
  10. TOJ 3287 Sudoku 9*9数独 dfs