什么是 TCC

TCC 是分布式事务中的二阶段提交协议,它的全称为 Try-Confirm-Cancel,即资源预留(Try)、确认操作(Confirm)、取消操作(Cancel),他们的具体含义如下:
对于TCC的解释:
Try阶段:尝试执行,完成所有业务检查(一致性),预留必须业务资源(准隔离性)
Confirm阶段:确认执行真正执行业务,不作任何业务检查,只使用Try阶段预留的业务资源,Confirm操作满足幂等性。要求具备幂等设计,Confirm失败·后需要进行重试。
Cancel阶段:取消执行,释放Try阶段预留的业务资源 Cancel操作满足幂等性Cancel阶段的异常和Confirm阶段异常处理方案基本上一致。
对TCC的理解概括如下:
基于Confirm 与 Cancel任务执行必然成功的假设下,TCC仍然是一个2PC协议。
在分布式环境下,它没有提供一个全局的锁的机制去控制资源竞争,仅是约定一个异常时处理回滚操作的流程。
也因为如此他的性能强于AT模式

TCC 是一种侵入式的分布式事务解决方案,以上三个操作都需要业务系统自行实现,对业务系统有着非常大的入侵性,设计相对复杂,但优点是 TCC 完全不依赖数据库,能够实现跨数据库、跨应用资源管理,对这些不同数据访问通过侵入式的编码方式实现一个原子操作(要么一起commit,要么一起rollback),更好地解决了在各种复杂业务场景下的分布式事务问题。

演示场景

就以电商系统中下订单为例,为了演示,直接去掉账户服务,以订单服务、库存服务为例介绍。
具体的逻辑如下:
客户端调用下订单接口
扣库存
创建订单
请求完成
根据上面的逻辑可知,订单服务肯定是主业务服务,事务的发起方,库存服务是从业务服务,参与事务的决策。

Seat的AT模式解决方案伪代码如下:

@GlobalTransactional
public Result<Void> createOrder(Long productId,Long num,.....){//1、扣库存reduceStorage();//2、创建订单saveOrder();
}

@GlobalTransactional这个注解用于发起一个全局事务。
但是AT模式有局限性,如下:
性能低,锁定资源时间太长
无法解决跨应用的事务
因此对于要求性能的下单接口,可以考虑使用TCC模式进行拆分成两阶段执行,这样整个流程锁定资源的时间将会变短,性能也能提高。
此时的TCC模式的拆分如下:
1、一阶段的Try操作
TCC模式中的Try阶段其实就是预留资源,在这个过程中可以将需要的商品数量的库存冻结,这样就要在库存表中维护一个冻结的库存这个字段。
伪代码如下:

@Transactional
public boolean try(){//冻结库存frozenStorage();//生成订单,状态为待确认saveOrder();
}

注意:@Transactional开启了本地事务,只要出现了异常,本地事务将会回滚,同时执行第二阶段的cancel操作。
二阶段的confirm操作
confirm操作在一阶段try操作成功之后提交事务,涉及到的操作如下:
释放try操作冻结的库存(冻结库存-购买数量)
生成订单
伪代码如下:

@Transactional
public boolean confirm(){//释放掉try操作预留的库存cleanFrozen();//修改订单,状态为已完成updateOrder();return true;
}

注意:这里如果返回false,遵循TCC规范,应该要不断重试,直到confirm完成。
二阶段的cancel操作
cancel操作在一阶段try操作出现异常之后执行,用于回滚资源,涉及到的操作如下:
恢复冻结的库存(冻结库存-购买数量、库存+购买数量)
删除订单

@Transactional
public boolean cancel(){//释放掉try操作预留的库存rollbackFrozen();//修改订单,状态为已完成delOrder();return true;
}

注意:这里如果返回false,遵循TCC规范,应该要不断重试,直到cancel完成。

seata TCC模式 实例 (dubbo)

假设现有一个业务需要同时使用服务 A 和服务 B 完成一个事务操作,我们在服务 A 定义该服务的一个 TCC 接口:

public interface TccActionOne {@TwoPhaseBusinessAction(name = "DubboTccActionOne", commitMethod = "commit", rollbackMethod = "rollback")public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") String a);public boolean commit(BusinessActionContext actionContext);public boolean rollback(BusinessActionContext actionContext);
}

同样,在服务 B 定义该服务的一个 TCC 接口:

public interface TccActionTwo {@TwoPhaseBusinessAction(name = "DubboTccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")public void prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "b") String b);public void commit(BusinessActionContext actionContext);public void rollback(BusinessActionContext actionContext);
}

业务系统分别调用两个服务,要求两个服务可以通过是成功或者同时失败

   */@GlobalTransactionalpublic String doTransactionCommit() {//第一个TCC 事务参与者boolean result = tccActionOne.prepare(null, 1);if (!result) {throw new RuntimeException("TccActionOne failed.");}List list = new ArrayList();list.add("c1");list.add("c2");result = tccActionTwo.prepare(null, "two", list);if (!result) {throw new RuntimeException("TccActionTwo failed.");}return RootContext.getXID();}

以上就是使用 Seata TCC 模式实现一个全局事务的例子,可以看出,TCC 模式同样使用 @GlobalTransactiona 注解开启全局事务,而服务 A 和服务 B 的 TCC 接口为事务参与者,Seata 会把一个 TCC 接口当成一个 Resource,也叫 TCC Resource。
Seata 启动时会对 TCC 接口进行扫描并解析,如果 TCC 接口是一个发布方,则在 Seata 启动时会向 TC 注册 TCC Resource,每个 TCC Resource 都有一个资源 ID;如果 TCC 接口时一个调用方,Seata 代理调用方,与 AT 模式一样,代理会拦截 TCC 接口的调用,即每次调用 Try 方法,会向 TC 注册一个分支事务,接着才执行原来的 RPC 调用。
当全局事务决议提交/回滚时,TC 会通过分支注册的的资源 ID 回调到对应参与者服务中执行 TCC Resource 的 Confirm/rollback方法。做到要么同时成功,要么同时失败。

seata如何实现TCC

GlobalTransactionScanner

public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {

继承InitializingBean,加载完毕后执行afterPropertiesSet

 @Overridepublic void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)this);return;}if (initialized.compareAndSet(false, true)) {initClient();}}

初始化Tm和RM客户端

private void initClient() {//init TMTMClient.init(applicationId, txServiceGroup, accessKey, secretKey);//init RMRMClient.init(applicationId, txServiceGroup);registerSpringShutdownHook();}

初始化netty属性

  public void init() {timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);if (NettyClientConfig.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();clientBootstrap.start();}

10秒一次的检查重连seata-server的线程。
一个发送消息的线程,不断从basketMap中取数据

protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();
    private class MergedSendRunnable implements Runnable {@Overridepublic void run() {while (true) {synchronized (mergeLock) {try {mergeLock.wait(MAX_MERGE_SEND_MILLS);} catch (InterruptedException e) {}}isSending = true;basketMap.forEach((address, basket) -> {if (basket.isEmpty()) {return;}MergedWarpMessage mergeMessage = new MergedWarpMessage();while (!basket.isEmpty()) {RpcMessage msg = basket.poll();mergeMessage.msgs.add((AbstractMessage) msg.getBody());mergeMessage.msgIds.add(msg.getId());}if (mergeMessage.msgIds.size() > 1) {printMergeMessageLog(mergeMessage);}Channel sendChannel = null;try {// send batch message is sync request, but there is no need to get the return value.// Since the messageFuture has been created before the message is placed in basketMap,// the return value will be obtained in ClientOnResponseProcessor.sendChannel = clientChannelManager.acquireChannel(address);AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);} catch (FrameworkException e) {if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {destroyChannel(address, sendChannel);}// fast failfor (Integer msgId : mergeMessage.msgIds) {MessageFuture messageFuture = futures.remove(msgId);if (messageFuture != null) {messageFuture.setResultMessage(null);}}LOGGER.error("client merge call failed: {}", e.getMessage(), e);}});isSending = false;}}}

当有消息需要发送,放入到basketMap,此线程会发送消息,起到一个发送消息的缓冲作用

资源注册

RM注册

每个服务,既可以是事务发起者,也可以是事务参与者。因此,每台服务既是TM同时也是RM
GlobalTransactionScanner还继承了AbstractAutoProxyCreator
在bean加载完成时会执行wrapIfNecessary

Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxyif (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);} else {Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);if (!existsAnnotation(new Class[]{serviceInterface})&& !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));for (Advisor avr : advisor) {advised.addAdvisor(0, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}
public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);//get RemotingBean descriptionRemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);//is remoting beanif (isRemotingBean) {if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {//LocalTCCreturn isTccProxyTargetBean(remotingDesc);} else {// sofa:reference / dubbo:reference, factory beanreturn false;}} else {if (remotingDesc == null) {//check FactoryBeanif (isRemotingFactoryBean(bean, beanName, applicationContext)) {remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);return isTccProxyTargetBean(remotingDesc);} else {return false;}} else {return isTccProxyTargetBean(remotingDesc);}}}

如果方法带有TwoPhaseBusinessAction注解,那么注册资源到seata-server

 public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);if (remotingBeanDesc == null) {return null;}remotingServiceMap.put(beanName, remotingBeanDesc);Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();Method[] methods = interfaceClass.getMethods();if (remotingParser.isService(bean, beanName)) {try {//service bean, registry resourceObject targetBean = remotingBeanDesc.getTargetBean();for (Method m : methods) {TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);if (twoPhaseBusinessAction != null) {TCCResource tccResource = new TCCResource();tccResource.setActionName(twoPhaseBusinessAction.name());tccResource.setTargetBean(targetBean);tccResource.setPrepareMethod(m);tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());tccResource.setCommitMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),new Class[] {BusinessActionContext.class}));tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());tccResource.setRollbackMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),new Class[] {BusinessActionContext.class}));//registry tcc resourceDefaultResourceManager.get().registerResource(tccResource);}}} catch (Throwable t) {throw new FrameworkException(t, "parser remoting service error");}}if (remotingParser.isReference(bean, beanName)) {//reference bean, TCC proxyremotingBeanDesc.setReference(true);}return remotingBeanDesc;}

组装发送的信息,发送到seata-server

//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
由于是提供者RM并没有进行代理
再看服务端注册RM的逻辑
io.seata.core.rpc.netty.ChannelManager#registerRMChannel

  public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)throws IncompatibleVersionException {Version.checkVersion(resourceManagerRequest.getVersion());Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());RpcContext rpcContext;if (!IDENTIFIED_CHANNELS.containsKey(channel)) {rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),resourceManagerRequest.getResourceIds(), channel);rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);} else {rpcContext = IDENTIFIED_CHANNELS.get(channel);rpcContext.addResources(dbkeySet);}if (dbkeySet == null || dbkeySet.isEmpty()) { return; }for (String resourceId : dbkeySet) {String clientIp;ConcurrentMap<Integer, RpcContext> portMap = CollectionUtils.computeIfAbsent(RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>()).computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>()).computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());rpcContext.holdInResourceManagerChannels(resourceId, portMap);updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());}}

存储rm信息到_RM_CHANNELS_,记录连接信息。

TM注册

客户端注册TM信息

做必要的代理

在io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary中
如果是 reference bean并且带有TwoPhaseBusinessAction注解,则需要进行代理,进一步增强操作
使用TccActionInterceptor做代理(实际上就是对dubbo本来对服务引用的代理基础上又做了一次代理

对于带有GlobalTransactional注解的事务发起类做代理

private boolean existsAnnotation(Class<?>[] classes) {if (CollectionUtils.isNotEmpty(classes)) {for (Class<?> clazz : classes) {if (clazz == null) {continue;}GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}Method[] methods = clazz.getMethods();for (Method method : methods) {trxAnno = method.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);if (lockAnno != null) {return true;}}}}return false;}

使用GlobalTransactionalInterceptor进行代理操作

进入全局事务拦截器

经过GlobalTransactionalInterceptor

   @Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable {Class<?> targetClass =methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);final GlobalTransactional globalTransactionalAnnotation =getAnnotation(method, targetClass, GlobalTransactional.class);final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);if (!localDisable) {if (globalTransactionalAnnotation != null) {return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation, globalLockAnnotation);}}}return methodInvocation.proceed();}

如果注解GlobalTransactional不为空,处理全局事务
进入事务执行方法
io.seata.tm.api.TransactionalTemplate#execute

    public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.GlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {case NOT_SUPPORTED:// If transaction is existing, suspend it.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();}// Execute without transaction and return.return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();tx = GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.if (tx == null) {tx = GlobalTransactionContext.createNew();}// set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,//    else do nothing. Of course, the hooks will still be triggered.beginTransaction(txInfo, tx);Object rs;try {// Do Your Businessrs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.commitTransaction(tx);return rs;} finally {//5. clearresumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}}

获取当前事务信息,根据事务传播属性做进一步判断。

开启全局事务

    beginTransaction(txInfo, tx);
 @Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}return response.getXid();}

注册开启全局事务,如果是第一连接seata-server还是注册TM

服务端逻辑
注册TM channel
io.seata.core.rpc.netty.ChannelManager#registerTMChannel

 public static void registerTMChannel(RegisterTMRequest request, Channel channel)throws IncompatibleVersionException {Version.checkVersion(request.getVersion());RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),request.getApplicationId(),request.getTransactionServiceGroup(),null, channel);rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR+ ChannelUtil.getClientIpFromChannel(channel);ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = CollectionUtils.computeIfAbsent(TM_CHANNELS,clientIdentified, key -> new ConcurrentHashMap<>());rpcContext.holdInClientChannels(clientIdentifiedMap);}

开启全局事务

  @Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));return session.getXid();}

创建globalSession,返回xid
服务端还要对session进行管理,放入sessionMap进行管理

 @Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {super.addGlobalSession(session);sessionMap.put(session.getXid(), session);}

执行业务逻辑

    rs = business.execute();

开始业务代码逻辑,在TCC模式下,一般会执行各个业务系统的prepare方法
前面已经说了对于reference service会使用TccActionInterceptor进行代理

  @Overridepublic Object invoke(final MethodInvocation invocation) throws Throwable {try {Object[] methodArgs = invocation.getArguments();//Handler the TCC AspectMap<String, Object> ret = actionInterceptorHandler.proceed(method, methodArgs, xid, businessAction,invocation::proceed);//return the final resultreturn ret.get(Constants.TCC_METHOD_RESULT);}finally {//if not TCC, unbind branchTypeif (BranchType.TCC != previousBranchType) {RootContext.unbindBranchType();}//MDC remove branchIdMDC.remove(RootContext.MDC_KEY_BRANCH_ID);}return invocation.proceed();}protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,BusinessActionContext actionContext) {String actionName = actionContext.getActionName();String xid = actionContext.getXid();//Map<String, Object> context = fetchActionRequestContext(method, arguments);context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());//init business contextinitBusinessContext(context, method, businessAction);//Init running environment contextinitFrameworkContext(context);actionContext.setActionContext(context);//init applicationDataMap<String, Object> applicationContext = new HashMap<>(4);applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);String applicationContextStr = JSON.toJSONString(applicationContext);try {//registry branch recordLong branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null);return String.valueOf(branchId);} catch (Throwable t) {String msg = String.format("TCC branch Register error, xid: %s", xid);LOGGER.error(msg, t);throw new FrameworkException(t, msg);}}

需要开始分支事务 ,需要发送事务的相关信息到服务端
{“actionContext”:{“a”:1,“action-start-time”:1659771487917,“sys::prepare”:“prepare”,“sys::rollback”:“rollback”,“sys::commit”:“commit”,“host-name”:“10.100.100.27”,“actionName”:“DubboTccActionOne”}}
读取的信息为接口注解上的信息

刚好RM在启动时,也把对应的信息注册到了服务端。
分支事务开启服务端逻辑
io.seata.server.coordinator.AbstractCore#branchRegister

  @Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);return SessionHolder.lockAndExecute(globalSession, () -> {globalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));branchSessionLock(globalSession, branchSession);try {globalSession.addBranch(branchSession);} catch (RuntimeException ex) {branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);}return branchSession.getBranchId();});}

创建一个BranchSession添加的GlobalSession下面这里注意 branchSessionLock(globalSession, branchSession);这个方法

发现只有在AT模式才有实现,也就是说TCC模式时没有对数据库上锁的操作的
开启分支支付后远程调用dubbo服务。执行业务逻辑

异常回滚

  1. prepare阶段异常

prepare指的时业务代码导致的异常,事务发起者TM会发起异常回滚的请求

    @Overridepublic void rollback() throws TransactionException {if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of rollbackif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);}return;}assertXIDNotNull();int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;try {while (retry > 0) {try {status = transactionManager.rollback(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {throw new TransactionException("Failed to report global rollback", ex);}}}} finally {if (xid.equals(RootContext.getXID())) {suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] rollback status: {}", xid, status);}}@Overridepublic GlobalStatus rollback(String xid) throws TransactionException {GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);return response.getGlobalStatus();}

看的出,如果请求回滚失败,会重试,并且有一个重试次数 默认5此,如果发生已成,会继续重试,并且这是一个同步请求。直到服务端确认失败或者成功才能返回。
服务端逻辑
io.seata.server.coordinator.DefaultCore#rollback

@Overridepublic GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.if (globalSession.getStatus() == GlobalStatus.Begin) {globalSession.changeStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}doGlobalRollback(globalSession, false);return globalSession.getStatus();}

更改globalSession状态为GlobalStatus.Rollbacking
然后进入doGlobalRollback
循环分支事务,分别进行回滚操作

  Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {globalSession.removeBranch(branchSession);return CONTINUE;}try {BranchStatus branchStatus = branchRollback(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Rollbacked:globalSession.removeBranch(branchSession);LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable:SessionHelper.endRollbackFailed(globalSession);LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return false;default:LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex,"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});if (!retrying) {globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});@Overridepublic BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {BranchRollbackRequest request = new BranchRollbackRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());return branchRollbackSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(FailedToSendBranchRollbackRequest,String.format("Send branch rollback failed, xid = %s branchId = %s",branchSession.getXid(), branchSession.getBranchId()), e);}}

分别给分支事务资源对应的RM发送回滚消息(RM已经把资源注册到seata-server,因此seata-server可以根据资源找到对应的rm),同样是同步消息。
RM根据rolllback返回值判断是否回滚成功。

   return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;

针对回滚的不同状态有不同的操作

     switch (branchStatus) {// 回滚成功,移除分支事务case PhaseTwo_Rollbacked:globalSession.removeBranch(branchSession);LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return CONTINUE;// 回滚失败,并且无法重试 ,会将globalSession的状态设置为  case PhaseTwo_RollbackFailed_Unretryable:SessionHelper.endRollbackFailed(globalSession);LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return false;default:// 回滚失败。globalSession放入重试队列进行重试回滚LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}

看下定时任务的处理重试回滚队列的逻辑

   protected void handleRetryRollbacking() {Collection<GlobalSession> rollbackingSessions = SessionHolder.getRetryRollbackingSessionManager().allSessions();if (CollectionUtils.isEmpty(rollbackingSessions)) {return;}long now = System.currentTimeMillis();SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {try {// prevent repeated rollbackif (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking) && !rollbackingSession.isDeadSession()) {//The function of this 'return' is 'continue'.return;}if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {rollbackingSession.clean();}/*** Prevent thread safety issues*/SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);LOGGER.info("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());//The function of this 'return' is 'continue'.return;}rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());core.doGlobalRollback(rollbackingSession, true);} catch (TransactionException ex) {LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());}});}

默认为永不超时。可以通过配置

    /*** The constant MAX_ROLLBACK_RETRY_TIMEOUT.*/String MAX_ROLLBACK_RETRY_TIMEOUT = SERVER_PREFIX + "maxRollbackRetryTimeout";

,也就是说如果一直回滚失败,就会一直重。

再回到TM的rollback方法,如果收到了seata-server的回复就结束循环了,虽然此时可能并没有完成回滚,但是剩下的就交给seata-server进行不断的回滚重试。

     while (retry > 0) {try {status = transactionManager.rollback(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {throw new TransactionException("Failed to report global rollback", ex);}}}
  1. 事务超时回滚

检测事务超时,检测全局事务的回滚状态
定时任务io.seata.server.coordinator.DefaultCoordinator#timeoutCheck,默认5秒检测一次

 public boolean isTimeout() {return (System.currentTimeMillis() - beginTime) > timeout;}

事务提交

事务提交一定是由TM发起

                // 4. everything is fine, commit.commitTransaction(tx);
public void commit() throws TransactionException {if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}assertXIDNotNull();int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {while (retry > 0) {try {status = transactionManager.commit(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);}}}} finally {if (xid.equals(RootContext.getXID())) {suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] commit status: {}", xid, status);}}

向seata-server发送提交全局事务的请求,异常也会重试。
再看服务端的提交逻辑

  Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// if not retrying, skip the canBeCommittedAsync branchesif (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {globalSession.removeBranch(branchSession);return CONTINUE;}try {BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Committed:globalSession.removeBranch(branchSession);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable:if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());return CONTINUE;} else {SessionHelper.endCommitFailed(globalSession);LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());return false;}default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",branchSession.getBranchId(), branchStatus);return CONTINUE;} else {LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});

和回滚一样,挨个分支事务尝试回滚,执行对应的commit方法,成功移除分支事务,如果回滚失败会将加入重试队列重试提交

TCC事务模型的三个异常

实现TCC事务模型涉及到的三个异常是不可避免的,实际生产中必须要规避这三大异常。
1、空回滚
定义:在未调用try方法或try方法未执行成功的情况下,就执行了cancel方法进行了回滚。
怎么理解呢?未调用try方法就执行了cancel方法,这个很容易理解,既然没有预留资源,那么肯定是不能回滚。
try方法未执行成功是什么意思?
可以看上节中的第一阶段try方法的伪代码,由于try方法开启了本地事务,一旦try方法执行过程中出现了异常,将会导致try方法的本地事务回滚(注意这里不是cancel方法回滚,而是try方法的本地事务回滚),这样其实try方法中的所有操作都将会回滚,也就没有必要调用cancel方法。
但是实际上一旦try方法抛出了异常,那么必定是要调用cancel方法进行回滚,这样就导致了空回滚。
解决方案:
解决逻辑很简单:在cancel方法执行操作之前,必须要知道try方法是否执行成功。

2、幂等性

TCC模式定义中提到:如果confirm或者cancel方法执行失败,要一直重试直到成功。
这里就涉及了幂等性,confirm和cancel方法必须保证同一个全局事务中的幂等性。
解决方案:
解决逻辑很简单:对付幂等,自然是要利用幂等标识进行防重操作。

3、悬挂
悬挂是指因为网络问题,RM 开始没有收到 try 指令,但是执行了 Rollback 后 RM 又收到了 try 指令并且预留资源成功,这时全局事务已经结束,最终导致预留的资源不能释放。
事务协调器在调用 TCC 服务的一阶段 Try 操作时,可能会出现因网络拥堵而导致的超时,比如dubbo调用超时,此时事务管理器会触发二阶段回滚,调用 TCC 服务的 Cancel 操作,Cancel 调用未超时;
在此之后,拥堵在网络上的一阶段 Try 数据包被 TCC 服务收到,出现了二阶段 Cancel 请求比一阶段 Try 请求先执行的情况,此 TCC 服务在执行晚到的 Try 之后,将永远不会再收到二阶段的 Confirm 或者 Cancel ,造成 TCC 服务悬挂。
解决方案:
解决逻辑很简单:在执行try方法操作资源之前判断cancel方法是否已经执行;同样的在cancel方法执行后要记录执行的状态。
4、总结
针对以上三个异常,落地的解决方案很多,比如维护一个事务状态表,每个事务的执行阶段全部记录下来。
幂等:在执行confirm或者cancel之前根据事务状态表查询当前全局事务是否已经执行过confirm或者cancel方法
空回滚:在执行cancel之前才能根据事务状态表查询当前全局事务是否已经执行成功try方法
悬挂:在执行try方法之前,根据事务状态表查询当前全局事务是否已经执行过cancel方法

事务状态表(解决三个异常)

本文使用的是1.4.2版本,源码中还没有这个机制
在 Seata1.5.0 版本中 ,增加了一张事务控制表,表名是 tcc_fence_log 来解决这个问题。而在上一节 @TwoPhaseBusinessAction 注解中提到的属性 useTCCFence 就是来指定是否开启这个机制,这个属性值默认是 false。
tcc_fence_log 建表语句如下(MySQL 语法):

CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(`xid`           VARCHAR(128)  NOT NULL COMMENT 'global id',`branch_id`     BIGINT        NOT NULL COMMENT 'branch id',`action_name`   VARCHAR(64)   NOT NULL COMMENT 'action name',`status`        TINYINT       NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',`gmt_create`    DATETIME(3)   NOT NULL COMMENT 'create time',`gmt_modified`  DATETIME(3)   NOT NULL COMMENT 'update time',PRIMARY KEY (`xid`, `branch_id`),KEY `idx_gmt_modified` (`gmt_modified`),KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

保证幂等
提交事务时首先会判断 tcc_fence_log 表中是否已经有记录,如果有记录,则判断事务执行状态并返回。这样如果判断到事务的状态已经是 STATUS_COMMITTED,就不会再次提交,保证了幂等。如果 tcc_fence_log 表中没有记录,则插入一条记录,供后面重试时判断。

Rollback 的逻辑跟 commit 类似,逻辑在类 TCCFenceHandler 的 rollbackFence 方法。

防止空回滚
Seata 的解决方案是在 try 阶段 往 tcc_fence_log 表插入一条记录,status 字段值是 STATUS_TRIED,在 Rollback 阶段判断记录是否存在,如果不存在,则不执行回滚操作。

防止悬挂
Seata 解决这个问题的方法是执行 Rollback 方法时先判断 tcc_fence_log 是否存在当前 xid 的记录,如果没有则向 tcc_fence_log 表插入一条记录,状态是 STATUS_SUSPENDED,并且不再执行回滚操作。代码如下:

Seata之TCC模式相关推荐

  1. 实战~阿里神器 Seata 实现 TCC模式 解决分布式事务,真香

    今天这篇文章介绍一下Seata如何实现TCC事务模式,文章目录如下: 什么是TCC模式? TCC(Try Confirm Cancel)方案是一种应用层面侵入业务的两阶段提交.是目前最火的一种柔性事务 ...

  2. 实战!阿里神器 Seata 实现 TCC模式 解决分布式事务,真香!

    ‍ 今天这篇文章介绍一下Seata如何实现TCC事务模式,文章目录如下: 目录   什么是TCC模式? TCC(Try Confirm Cancel)方案是一种应用层面侵入业务的两阶段提交.是目前最火 ...

  3. 阿里巴巴开源分布式框架Seata TCC模式深入分析

    2019 年 3 月,蚂蚁金服加入分布式事务 Seata 的社区共建中,并贡献其 TCC 模式.本期是 SOFAChannel 第四期,主题:分布式事务 Seata TCC 模式深度解析,本文根据觉生 ...

  4. 分布式事务 - Seata - TCC模式

    目录 一.什么是TCC 二.AT & TCC区别 及 适用场景 三.代码集成示例 3.1 升级Seata 1.5.2 3.2 示例场景说明 3.3 TCC核心接口定义 3.4 TCC相关阶段规 ...

  5. 多个mapper的事务回滚_揭秘蚂蚁金服分布式事务 Seata 的AT、Saga和TCC模式

    作者| 屹远(陈龙),蚂蚁金服分布式事务核心研发 . 导语 本文根据 8月11日 SOFA Meetup#3 广州站 <分布式事务 Seata 及其三种模式详解>主题分享整理,着重分享分布 ...

  6. 探秘蚂蚁金服分布式事务 Seata 的AT、Saga和TCC模式

    作者| 屹远(陈龙),蚂蚁金服分布式事务核心研发 . 导语 本文根据 SOFA Meetup#3 广州站 <分布式事务 Seata 及其三种模式详解>主题分享整理,着重分享分布式事务产生的 ...

  7. 阿里中间件seata源码剖析六:TCC模式中2阶段提交实现

    目录 TM通知TC事务状态 TC通知RM分支事务提交 RM处理TC提交事务请求 总结 上篇文章中,我们以TCC模式的demo为例,讲解了seata中全局事务的开启.在这个demo中,TM作为一个全局事 ...

  8. seata TCC模式

    .Seata 产品模块 .Seata 中有三⼤模块,分别是 TM.RM 和 TC.其中 TM 和 RM 是作为 Seata 的客户端与业务系统集 成在⼀起,TC 作为 Seata 的服务端独⽴部署. ...

  9. 分布式事务——分布式事务简介、分布式事务框架 Seata(AT模式、Tcc模式、Tcc Vs AT)、分布式事务—MQ

    分布式事务--分布式事务简介.分布式事务框架 Seata(AT模式.Tcc模式.Tcc Vs AT).分布式事务--MQ 一.分布式事务简介 如果不是分布式环境的话一般不会接触到这种,一旦是微服务这种 ...

最新文章

  1. mvn 打包项目到eclipse
  2. odoo12 mysql_odoo12从零开始:二、1)个性化定制odoo12之修改数据库创建页面
  3. 小白学数据分析-----留存率分析_I[次日留存率突然下降了50%?]
  4. 美团Android开发工程师岗位职能要求,真香
  5. long转string mybatis_Spring+Mybatis类型转换的问题,oracle数据库中有一个clob类型,怎样在查询以后转换为String类型?...
  6. 一些关于angular的总结
  7. solr java 客户端
  8. 【Arcgis操作】模块化(批量、自动化)计算多个图层的面积
  9. 使用jqery模拟网易严选购物车功能
  10. 绝地反击显示服务器,绝地反击
  11. OpenCV之 图像染色
  12. 解决IE浏览器被2345劫持问题
  13. Android 打开系统文件管理器选择文件
  14. 51单片机之外部中断拙见
  15. visual studio 2022安装vsix插件
  16. 龙光“五一”热销数十亿,领跑全国楼市
  17. UiPath安装教程
  18. 链表-单向链表的实现
  19. String 的常用API?
  20. 【搬运合集】六款绿色的学习休闲软件,助你成为考研锦鲤!

热门文章

  1. linux网络编程函数解析之——setsockopt / getsockopt用法
  2. python的__main__
  3. 正则表达式匹配特定开头和结尾的一行
  4. 中药配方专利申请时间有多久?
  5. apple 用户迁移
  6. 猫头鹰当宠物?好看不好养
  7. 基于 mysql时序_几个时序数据库
  8. webservice技术的预言
  9. OpenShift集群完善及创建应用CakePHP
  10. IOS7 iBeacons探寻