Seata之TCC模式
什么是 TCC
TCC 是分布式事务中的二阶段提交协议,它的全称为 Try-Confirm-Cancel,即资源预留(Try)、确认操作(Confirm)、取消操作(Cancel),他们的具体含义如下:
对于TCC的解释:
Try阶段:尝试执行,完成所有业务检查(一致性),预留必须业务资源(准隔离性)
Confirm阶段:确认执行真正执行业务,不作任何业务检查,只使用Try阶段预留的业务资源,Confirm操作满足幂等性。要求具备幂等设计,Confirm失败·后需要进行重试。
Cancel阶段:取消执行,释放Try阶段预留的业务资源 Cancel操作满足幂等性Cancel阶段的异常和Confirm阶段异常处理方案基本上一致。
对TCC的理解概括如下:
基于Confirm 与 Cancel任务执行必然成功的假设下,TCC仍然是一个2PC协议。
在分布式环境下,它没有提供一个全局的锁的机制去控制资源竞争,仅是约定一个异常时处理回滚操作的流程。
也因为如此他的性能强于AT模式
TCC 是一种侵入式的分布式事务解决方案,以上三个操作都需要业务系统自行实现,对业务系统有着非常大的入侵性,设计相对复杂,但优点是 TCC 完全不依赖数据库,能够实现跨数据库、跨应用资源管理,对这些不同数据访问通过侵入式的编码方式实现一个原子操作(要么一起commit,要么一起rollback),更好地解决了在各种复杂业务场景下的分布式事务问题。
演示场景
就以电商系统中下订单为例,为了演示,直接去掉账户服务,以订单服务、库存服务为例介绍。
具体的逻辑如下:
客户端调用下订单接口
扣库存
创建订单
请求完成
根据上面的逻辑可知,订单服务肯定是主业务服务,事务的发起方,库存服务是从业务服务,参与事务的决策。
Seat的AT模式解决方案伪代码如下:
@GlobalTransactional
public Result<Void> createOrder(Long productId,Long num,.....){//1、扣库存reduceStorage();//2、创建订单saveOrder();
}
@GlobalTransactional这个注解用于发起一个全局事务。
但是AT模式有局限性,如下:
性能低,锁定资源时间太长
无法解决跨应用的事务
因此对于要求性能的下单接口,可以考虑使用TCC模式进行拆分成两阶段执行,这样整个流程锁定资源的时间将会变短,性能也能提高。
此时的TCC模式的拆分如下:
1、一阶段的Try操作
TCC模式中的Try阶段其实就是预留资源,在这个过程中可以将需要的商品数量的库存冻结,这样就要在库存表中维护一个冻结的库存这个字段。
伪代码如下:
@Transactional
public boolean try(){//冻结库存frozenStorage();//生成订单,状态为待确认saveOrder();
}
注意:@Transactional开启了本地事务,只要出现了异常,本地事务将会回滚,同时执行第二阶段的cancel操作。
二阶段的confirm操作
confirm操作在一阶段try操作成功之后提交事务,涉及到的操作如下:
释放try操作冻结的库存(冻结库存-购买数量)
生成订单
伪代码如下:
@Transactional
public boolean confirm(){//释放掉try操作预留的库存cleanFrozen();//修改订单,状态为已完成updateOrder();return true;
}
注意:这里如果返回false,遵循TCC规范,应该要不断重试,直到confirm完成。
二阶段的cancel操作
cancel操作在一阶段try操作出现异常之后执行,用于回滚资源,涉及到的操作如下:
恢复冻结的库存(冻结库存-购买数量、库存+购买数量)
删除订单
@Transactional
public boolean cancel(){//释放掉try操作预留的库存rollbackFrozen();//修改订单,状态为已完成delOrder();return true;
}
注意:这里如果返回false,遵循TCC规范,应该要不断重试,直到cancel完成。
seata TCC模式 实例 (dubbo)
假设现有一个业务需要同时使用服务 A 和服务 B 完成一个事务操作,我们在服务 A 定义该服务的一个 TCC 接口:
public interface TccActionOne {@TwoPhaseBusinessAction(name = "DubboTccActionOne", commitMethod = "commit", rollbackMethod = "rollback")public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") String a);public boolean commit(BusinessActionContext actionContext);public boolean rollback(BusinessActionContext actionContext);
}
同样,在服务 B 定义该服务的一个 TCC 接口:
public interface TccActionTwo {@TwoPhaseBusinessAction(name = "DubboTccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")public void prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "b") String b);public void commit(BusinessActionContext actionContext);public void rollback(BusinessActionContext actionContext);
}
业务系统分别调用两个服务,要求两个服务可以通过是成功或者同时失败
*/@GlobalTransactionalpublic String doTransactionCommit() {//第一个TCC 事务参与者boolean result = tccActionOne.prepare(null, 1);if (!result) {throw new RuntimeException("TccActionOne failed.");}List list = new ArrayList();list.add("c1");list.add("c2");result = tccActionTwo.prepare(null, "two", list);if (!result) {throw new RuntimeException("TccActionTwo failed.");}return RootContext.getXID();}
以上就是使用 Seata TCC 模式实现一个全局事务的例子,可以看出,TCC 模式同样使用 @GlobalTransactiona
注解开启全局事务,而服务 A 和服务 B 的 TCC 接口为事务参与者,Seata 会把一个 TCC 接口当成一个 Resource,也叫 TCC Resource。
Seata 启动时会对 TCC 接口进行扫描并解析,如果 TCC 接口是一个发布方,则在 Seata 启动时会向 TC 注册 TCC Resource,每个 TCC Resource 都有一个资源 ID;如果 TCC 接口时一个调用方,Seata 代理调用方,与 AT 模式一样,代理会拦截 TCC 接口的调用,即每次调用 Try 方法,会向 TC 注册一个分支事务,接着才执行原来的 RPC 调用。
当全局事务决议提交/回滚时,TC 会通过分支注册的的资源 ID 回调到对应参与者服务中执行 TCC Resource 的 Confirm/rollback方法。做到要么同时成功,要么同时失败。
seata如何实现TCC
GlobalTransactionScanner
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
继承InitializingBean,加载完毕后执行afterPropertiesSet
@Overridepublic void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)this);return;}if (initialized.compareAndSet(false, true)) {initClient();}}
初始化Tm和RM客户端
private void initClient() {//init TMTMClient.init(applicationId, txServiceGroup, accessKey, secretKey);//init RMRMClient.init(applicationId, txServiceGroup);registerSpringShutdownHook();}
初始化netty属性
public void init() {timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);if (NettyClientConfig.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();clientBootstrap.start();}
10秒一次的检查重连seata-server的线程。
一个发送消息的线程,不断从basketMap中取数据
protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();
private class MergedSendRunnable implements Runnable {@Overridepublic void run() {while (true) {synchronized (mergeLock) {try {mergeLock.wait(MAX_MERGE_SEND_MILLS);} catch (InterruptedException e) {}}isSending = true;basketMap.forEach((address, basket) -> {if (basket.isEmpty()) {return;}MergedWarpMessage mergeMessage = new MergedWarpMessage();while (!basket.isEmpty()) {RpcMessage msg = basket.poll();mergeMessage.msgs.add((AbstractMessage) msg.getBody());mergeMessage.msgIds.add(msg.getId());}if (mergeMessage.msgIds.size() > 1) {printMergeMessageLog(mergeMessage);}Channel sendChannel = null;try {// send batch message is sync request, but there is no need to get the return value.// Since the messageFuture has been created before the message is placed in basketMap,// the return value will be obtained in ClientOnResponseProcessor.sendChannel = clientChannelManager.acquireChannel(address);AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);} catch (FrameworkException e) {if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {destroyChannel(address, sendChannel);}// fast failfor (Integer msgId : mergeMessage.msgIds) {MessageFuture messageFuture = futures.remove(msgId);if (messageFuture != null) {messageFuture.setResultMessage(null);}}LOGGER.error("client merge call failed: {}", e.getMessage(), e);}});isSending = false;}}}
当有消息需要发送,放入到basketMap,此线程会发送消息,起到一个发送消息的缓冲作用
资源注册
RM注册
每个服务,既可以是事务发起者,也可以是事务参与者。因此,每台服务既是TM同时也是RM
GlobalTransactionScanner还继承了AbstractAutoProxyCreator
在bean加载完成时会执行wrapIfNecessary
Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxyif (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);} else {Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);if (!existsAnnotation(new Class[]{serviceInterface})&& !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));for (Advisor avr : advisor) {advised.addAdvisor(0, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}
public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);//get RemotingBean descriptionRemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);//is remoting beanif (isRemotingBean) {if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {//LocalTCCreturn isTccProxyTargetBean(remotingDesc);} else {// sofa:reference / dubbo:reference, factory beanreturn false;}} else {if (remotingDesc == null) {//check FactoryBeanif (isRemotingFactoryBean(bean, beanName, applicationContext)) {remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);return isTccProxyTargetBean(remotingDesc);} else {return false;}} else {return isTccProxyTargetBean(remotingDesc);}}}
如果方法带有TwoPhaseBusinessAction注解,那么注册资源到seata-server
public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);if (remotingBeanDesc == null) {return null;}remotingServiceMap.put(beanName, remotingBeanDesc);Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();Method[] methods = interfaceClass.getMethods();if (remotingParser.isService(bean, beanName)) {try {//service bean, registry resourceObject targetBean = remotingBeanDesc.getTargetBean();for (Method m : methods) {TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);if (twoPhaseBusinessAction != null) {TCCResource tccResource = new TCCResource();tccResource.setActionName(twoPhaseBusinessAction.name());tccResource.setTargetBean(targetBean);tccResource.setPrepareMethod(m);tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());tccResource.setCommitMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),new Class[] {BusinessActionContext.class}));tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());tccResource.setRollbackMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),new Class[] {BusinessActionContext.class}));//registry tcc resourceDefaultResourceManager.get().registerResource(tccResource);}}} catch (Throwable t) {throw new FrameworkException(t, "parser remoting service error");}}if (remotingParser.isReference(bean, beanName)) {//reference bean, TCC proxyremotingBeanDesc.setReference(true);}return remotingBeanDesc;}
组装发送的信息,发送到seata-server
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
由于是提供者RM并没有进行代理
再看服务端注册RM的逻辑
io.seata.core.rpc.netty.ChannelManager#registerRMChannel
public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)throws IncompatibleVersionException {Version.checkVersion(resourceManagerRequest.getVersion());Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());RpcContext rpcContext;if (!IDENTIFIED_CHANNELS.containsKey(channel)) {rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),resourceManagerRequest.getResourceIds(), channel);rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);} else {rpcContext = IDENTIFIED_CHANNELS.get(channel);rpcContext.addResources(dbkeySet);}if (dbkeySet == null || dbkeySet.isEmpty()) { return; }for (String resourceId : dbkeySet) {String clientIp;ConcurrentMap<Integer, RpcContext> portMap = CollectionUtils.computeIfAbsent(RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>()).computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>()).computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());rpcContext.holdInResourceManagerChannels(resourceId, portMap);updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());}}
存储rm信息到_RM_CHANNELS_,记录连接信息。
TM注册
客户端注册TM信息
做必要的代理
在io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary中
如果是 reference bean并且带有TwoPhaseBusinessAction注解,则需要进行代理,进一步增强操作
使用TccActionInterceptor做代理(实际上就是对dubbo本来对服务引用的代理基础上又做了一次代理
对于带有GlobalTransactional注解的事务发起类做代理
private boolean existsAnnotation(Class<?>[] classes) {if (CollectionUtils.isNotEmpty(classes)) {for (Class<?> clazz : classes) {if (clazz == null) {continue;}GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}Method[] methods = clazz.getMethods();for (Method method : methods) {trxAnno = method.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);if (lockAnno != null) {return true;}}}}return false;}
使用GlobalTransactionalInterceptor进行代理操作
进入全局事务拦截器
经过GlobalTransactionalInterceptor
@Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable {Class<?> targetClass =methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);final GlobalTransactional globalTransactionalAnnotation =getAnnotation(method, targetClass, GlobalTransactional.class);final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);if (!localDisable) {if (globalTransactionalAnnotation != null) {return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation, globalLockAnnotation);}}}return methodInvocation.proceed();}
如果注解GlobalTransactional不为空,处理全局事务
进入事务执行方法
io.seata.tm.api.TransactionalTemplate#execute
public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.GlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {case NOT_SUPPORTED:// If transaction is existing, suspend it.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();}// Execute without transaction and return.return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();tx = GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.if (tx == null) {tx = GlobalTransactionContext.createNew();}// set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,// else do nothing. Of course, the hooks will still be triggered.beginTransaction(txInfo, tx);Object rs;try {// Do Your Businessrs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.commitTransaction(tx);return rs;} finally {//5. clearresumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}}
获取当前事务信息,根据事务传播属性做进一步判断。
开启全局事务
beginTransaction(txInfo, tx);
@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}return response.getXid();}
注册开启全局事务,如果是第一连接seata-server还是注册TM
服务端逻辑
注册TM channel
io.seata.core.rpc.netty.ChannelManager#registerTMChannel
public static void registerTMChannel(RegisterTMRequest request, Channel channel)throws IncompatibleVersionException {Version.checkVersion(request.getVersion());RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),request.getApplicationId(),request.getTransactionServiceGroup(),null, channel);rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR+ ChannelUtil.getClientIpFromChannel(channel);ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = CollectionUtils.computeIfAbsent(TM_CHANNELS,clientIdentified, key -> new ConcurrentHashMap<>());rpcContext.holdInClientChannels(clientIdentifiedMap);}
开启全局事务
@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));return session.getXid();}
创建globalSession,返回xid
服务端还要对session进行管理,放入sessionMap进行管理
@Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {super.addGlobalSession(session);sessionMap.put(session.getXid(), session);}
执行业务逻辑
rs = business.execute();
开始业务代码逻辑,在TCC模式下,一般会执行各个业务系统的prepare方法
前面已经说了对于reference service会使用TccActionInterceptor进行代理
@Overridepublic Object invoke(final MethodInvocation invocation) throws Throwable {try {Object[] methodArgs = invocation.getArguments();//Handler the TCC AspectMap<String, Object> ret = actionInterceptorHandler.proceed(method, methodArgs, xid, businessAction,invocation::proceed);//return the final resultreturn ret.get(Constants.TCC_METHOD_RESULT);}finally {//if not TCC, unbind branchTypeif (BranchType.TCC != previousBranchType) {RootContext.unbindBranchType();}//MDC remove branchIdMDC.remove(RootContext.MDC_KEY_BRANCH_ID);}return invocation.proceed();}protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,BusinessActionContext actionContext) {String actionName = actionContext.getActionName();String xid = actionContext.getXid();//Map<String, Object> context = fetchActionRequestContext(method, arguments);context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());//init business contextinitBusinessContext(context, method, businessAction);//Init running environment contextinitFrameworkContext(context);actionContext.setActionContext(context);//init applicationDataMap<String, Object> applicationContext = new HashMap<>(4);applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);String applicationContextStr = JSON.toJSONString(applicationContext);try {//registry branch recordLong branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null);return String.valueOf(branchId);} catch (Throwable t) {String msg = String.format("TCC branch Register error, xid: %s", xid);LOGGER.error(msg, t);throw new FrameworkException(t, msg);}}
需要开始分支事务 ,需要发送事务的相关信息到服务端
{“actionContext”:{“a”:1,“action-start-time”:1659771487917,“sys::prepare”:“prepare”,“sys::rollback”:“rollback”,“sys::commit”:“commit”,“host-name”:“10.100.100.27”,“actionName”:“DubboTccActionOne”}}
读取的信息为接口注解上的信息
刚好RM在启动时,也把对应的信息注册到了服务端。
分支事务开启服务端逻辑
io.seata.server.coordinator.AbstractCore#branchRegister
@Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);return SessionHolder.lockAndExecute(globalSession, () -> {globalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));branchSessionLock(globalSession, branchSession);try {globalSession.addBranch(branchSession);} catch (RuntimeException ex) {branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);}return branchSession.getBranchId();});}
创建一个BranchSession添加的GlobalSession下面这里注意 branchSessionLock(globalSession, branchSession);这个方法
发现只有在AT模式才有实现,也就是说TCC模式时没有对数据库上锁的操作的。
开启分支支付后远程调用dubbo服务。执行业务逻辑
异常回滚
- prepare阶段异常
prepare指的时业务代码导致的异常,事务发起者TM会发起异常回滚的请求
@Overridepublic void rollback() throws TransactionException {if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of rollbackif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);}return;}assertXIDNotNull();int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;try {while (retry > 0) {try {status = transactionManager.rollback(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {throw new TransactionException("Failed to report global rollback", ex);}}}} finally {if (xid.equals(RootContext.getXID())) {suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] rollback status: {}", xid, status);}}@Overridepublic GlobalStatus rollback(String xid) throws TransactionException {GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);return response.getGlobalStatus();}
看的出,如果请求回滚失败,会重试,并且有一个重试次数 默认5此,如果发生已成,会继续重试,并且这是一个同步请求。直到服务端确认失败或者成功才能返回。
服务端逻辑
io.seata.server.coordinator.DefaultCore#rollback
@Overridepublic GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.if (globalSession.getStatus() == GlobalStatus.Begin) {globalSession.changeStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}doGlobalRollback(globalSession, false);return globalSession.getStatus();}
更改globalSession状态为GlobalStatus.Rollbacking
然后进入doGlobalRollback
循环分支事务,分别进行回滚操作
Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {globalSession.removeBranch(branchSession);return CONTINUE;}try {BranchStatus branchStatus = branchRollback(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Rollbacked:globalSession.removeBranch(branchSession);LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable:SessionHelper.endRollbackFailed(globalSession);LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return false;default:LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex,"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});if (!retrying) {globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});@Overridepublic BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {BranchRollbackRequest request = new BranchRollbackRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());return branchRollbackSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(FailedToSendBranchRollbackRequest,String.format("Send branch rollback failed, xid = %s branchId = %s",branchSession.getXid(), branchSession.getBranchId()), e);}}
分别给分支事务资源对应的RM发送回滚消息(RM已经把资源注册到seata-server,因此seata-server可以根据资源找到对应的rm),同样是同步消息。
RM根据rolllback返回值判断是否回滚成功。
return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
针对回滚的不同状态有不同的操作
switch (branchStatus) {// 回滚成功,移除分支事务case PhaseTwo_Rollbacked:globalSession.removeBranch(branchSession);LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return CONTINUE;// 回滚失败,并且无法重试 ,会将globalSession的状态设置为 case PhaseTwo_RollbackFailed_Unretryable:SessionHelper.endRollbackFailed(globalSession);LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return false;default:// 回滚失败。globalSession放入重试队列进行重试回滚LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}
看下定时任务的处理重试回滚队列的逻辑
protected void handleRetryRollbacking() {Collection<GlobalSession> rollbackingSessions = SessionHolder.getRetryRollbackingSessionManager().allSessions();if (CollectionUtils.isEmpty(rollbackingSessions)) {return;}long now = System.currentTimeMillis();SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {try {// prevent repeated rollbackif (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking) && !rollbackingSession.isDeadSession()) {//The function of this 'return' is 'continue'.return;}if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {rollbackingSession.clean();}/*** Prevent thread safety issues*/SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);LOGGER.info("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());//The function of this 'return' is 'continue'.return;}rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());core.doGlobalRollback(rollbackingSession, true);} catch (TransactionException ex) {LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());}});}
默认为永不超时。可以通过配置
/*** The constant MAX_ROLLBACK_RETRY_TIMEOUT.*/String MAX_ROLLBACK_RETRY_TIMEOUT = SERVER_PREFIX + "maxRollbackRetryTimeout";
,也就是说如果一直回滚失败,就会一直重。
再回到TM的rollback方法,如果收到了seata-server的回复就结束循环了,虽然此时可能并没有完成回滚,但是剩下的就交给seata-server进行不断的回滚重试。
while (retry > 0) {try {status = transactionManager.rollback(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {throw new TransactionException("Failed to report global rollback", ex);}}}
- 事务超时回滚
检测事务超时,检测全局事务的回滚状态
定时任务io.seata.server.coordinator.DefaultCoordinator#timeoutCheck,默认5秒检测一次
public boolean isTimeout() {return (System.currentTimeMillis() - beginTime) > timeout;}
事务提交
事务提交一定是由TM发起
// 4. everything is fine, commit.commitTransaction(tx);
public void commit() throws TransactionException {if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}assertXIDNotNull();int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {while (retry > 0) {try {status = transactionManager.commit(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);}}}} finally {if (xid.equals(RootContext.getXID())) {suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] commit status: {}", xid, status);}}
向seata-server发送提交全局事务的请求,异常也会重试。
再看服务端的提交逻辑
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// if not retrying, skip the canBeCommittedAsync branchesif (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {globalSession.removeBranch(branchSession);return CONTINUE;}try {BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Committed:globalSession.removeBranch(branchSession);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable:if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());return CONTINUE;} else {SessionHelper.endCommitFailed(globalSession);LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());return false;}default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",branchSession.getBranchId(), branchStatus);return CONTINUE;} else {LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});
和回滚一样,挨个分支事务尝试回滚,执行对应的commit方法,成功移除分支事务,如果回滚失败会将加入重试队列重试提交
TCC事务模型的三个异常
实现TCC事务模型涉及到的三个异常是不可避免的,实际生产中必须要规避这三大异常。
1、空回滚
定义:在未调用try方法或try方法未执行成功的情况下,就执行了cancel方法进行了回滚。
怎么理解呢?未调用try方法就执行了cancel方法,这个很容易理解,既然没有预留资源,那么肯定是不能回滚。
try方法未执行成功是什么意思?
可以看上节中的第一阶段try方法的伪代码,由于try方法开启了本地事务,一旦try方法执行过程中出现了异常,将会导致try方法的本地事务回滚(注意这里不是cancel方法回滚,而是try方法的本地事务回滚),这样其实try方法中的所有操作都将会回滚,也就没有必要调用cancel方法。
但是实际上一旦try方法抛出了异常,那么必定是要调用cancel方法进行回滚,这样就导致了空回滚。
解决方案:
解决逻辑很简单:在cancel方法执行操作之前,必须要知道try方法是否执行成功。
2、幂等性
TCC模式定义中提到:如果confirm或者cancel方法执行失败,要一直重试直到成功。
这里就涉及了幂等性,confirm和cancel方法必须保证同一个全局事务中的幂等性。
解决方案:
解决逻辑很简单:对付幂等,自然是要利用幂等标识进行防重操作。
3、悬挂
悬挂是指因为网络问题,RM 开始没有收到 try 指令,但是执行了 Rollback 后 RM 又收到了 try 指令并且预留资源成功,这时全局事务已经结束,最终导致预留的资源不能释放。
事务协调器在调用 TCC 服务的一阶段 Try 操作时,可能会出现因网络拥堵而导致的超时,比如dubbo调用超时,此时事务管理器会触发二阶段回滚,调用 TCC 服务的 Cancel 操作,Cancel 调用未超时;
在此之后,拥堵在网络上的一阶段 Try 数据包被 TCC 服务收到,出现了二阶段 Cancel 请求比一阶段 Try 请求先执行的情况,此 TCC 服务在执行晚到的 Try 之后,将永远不会再收到二阶段的 Confirm 或者 Cancel ,造成 TCC 服务悬挂。
解决方案:
解决逻辑很简单:在执行try方法操作资源之前判断cancel方法是否已经执行;同样的在cancel方法执行后要记录执行的状态。
4、总结
针对以上三个异常,落地的解决方案很多,比如维护一个事务状态表,每个事务的执行阶段全部记录下来。
幂等:在执行confirm或者cancel之前根据事务状态表查询当前全局事务是否已经执行过confirm或者cancel方法
空回滚:在执行cancel之前才能根据事务状态表查询当前全局事务是否已经执行成功try方法
悬挂:在执行try方法之前,根据事务状态表查询当前全局事务是否已经执行过cancel方法
事务状态表(解决三个异常)
本文使用的是1.4.2版本,源码中还没有这个机制
在 Seata1.5.0 版本中 ,增加了一张事务控制表,表名是 tcc_fence_log 来解决这个问题。而在上一节 @TwoPhaseBusinessAction 注解中提到的属性 useTCCFence 就是来指定是否开启这个机制,这个属性值默认是 false。
tcc_fence_log 建表语句如下(MySQL 语法):
CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(`xid` VARCHAR(128) NOT NULL COMMENT 'global id',`branch_id` BIGINT NOT NULL COMMENT 'branch id',`action_name` VARCHAR(64) NOT NULL COMMENT 'action name',`status` TINYINT NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',`gmt_modified` DATETIME(3) NOT NULL COMMENT 'update time',PRIMARY KEY (`xid`, `branch_id`),KEY `idx_gmt_modified` (`gmt_modified`),KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
保证幂等
提交事务时首先会判断 tcc_fence_log 表中是否已经有记录,如果有记录,则判断事务执行状态并返回。这样如果判断到事务的状态已经是 STATUS_COMMITTED,就不会再次提交,保证了幂等。如果 tcc_fence_log 表中没有记录,则插入一条记录,供后面重试时判断。
Rollback 的逻辑跟 commit 类似,逻辑在类 TCCFenceHandler 的 rollbackFence 方法。
防止空回滚
Seata 的解决方案是在 try 阶段 往 tcc_fence_log 表插入一条记录,status 字段值是 STATUS_TRIED,在 Rollback 阶段判断记录是否存在,如果不存在,则不执行回滚操作。
防止悬挂
Seata 解决这个问题的方法是执行 Rollback 方法时先判断 tcc_fence_log 是否存在当前 xid 的记录,如果没有则向 tcc_fence_log 表插入一条记录,状态是 STATUS_SUSPENDED,并且不再执行回滚操作。代码如下:
Seata之TCC模式相关推荐
- 实战~阿里神器 Seata 实现 TCC模式 解决分布式事务,真香
今天这篇文章介绍一下Seata如何实现TCC事务模式,文章目录如下: 什么是TCC模式? TCC(Try Confirm Cancel)方案是一种应用层面侵入业务的两阶段提交.是目前最火的一种柔性事务 ...
- 实战!阿里神器 Seata 实现 TCC模式 解决分布式事务,真香!
今天这篇文章介绍一下Seata如何实现TCC事务模式,文章目录如下: 目录 什么是TCC模式? TCC(Try Confirm Cancel)方案是一种应用层面侵入业务的两阶段提交.是目前最火 ...
- 阿里巴巴开源分布式框架Seata TCC模式深入分析
2019 年 3 月,蚂蚁金服加入分布式事务 Seata 的社区共建中,并贡献其 TCC 模式.本期是 SOFAChannel 第四期,主题:分布式事务 Seata TCC 模式深度解析,本文根据觉生 ...
- 分布式事务 - Seata - TCC模式
目录 一.什么是TCC 二.AT & TCC区别 及 适用场景 三.代码集成示例 3.1 升级Seata 1.5.2 3.2 示例场景说明 3.3 TCC核心接口定义 3.4 TCC相关阶段规 ...
- 多个mapper的事务回滚_揭秘蚂蚁金服分布式事务 Seata 的AT、Saga和TCC模式
作者| 屹远(陈龙),蚂蚁金服分布式事务核心研发 . 导语 本文根据 8月11日 SOFA Meetup#3 广州站 <分布式事务 Seata 及其三种模式详解>主题分享整理,着重分享分布 ...
- 探秘蚂蚁金服分布式事务 Seata 的AT、Saga和TCC模式
作者| 屹远(陈龙),蚂蚁金服分布式事务核心研发 . 导语 本文根据 SOFA Meetup#3 广州站 <分布式事务 Seata 及其三种模式详解>主题分享整理,着重分享分布式事务产生的 ...
- 阿里中间件seata源码剖析六:TCC模式中2阶段提交实现
目录 TM通知TC事务状态 TC通知RM分支事务提交 RM处理TC提交事务请求 总结 上篇文章中,我们以TCC模式的demo为例,讲解了seata中全局事务的开启.在这个demo中,TM作为一个全局事 ...
- seata TCC模式
.Seata 产品模块 .Seata 中有三⼤模块,分别是 TM.RM 和 TC.其中 TM 和 RM 是作为 Seata 的客户端与业务系统集 成在⼀起,TC 作为 Seata 的服务端独⽴部署. ...
- 分布式事务——分布式事务简介、分布式事务框架 Seata(AT模式、Tcc模式、Tcc Vs AT)、分布式事务—MQ
分布式事务--分布式事务简介.分布式事务框架 Seata(AT模式.Tcc模式.Tcc Vs AT).分布式事务--MQ 一.分布式事务简介 如果不是分布式环境的话一般不会接触到这种,一旦是微服务这种 ...
最新文章
- mvn 打包项目到eclipse
- odoo12 mysql_odoo12从零开始:二、1)个性化定制odoo12之修改数据库创建页面
- 小白学数据分析-----留存率分析_I[次日留存率突然下降了50%?]
- 美团Android开发工程师岗位职能要求,真香
- long转string mybatis_Spring+Mybatis类型转换的问题,oracle数据库中有一个clob类型,怎样在查询以后转换为String类型?...
- 一些关于angular的总结
- solr java 客户端
- 【Arcgis操作】模块化(批量、自动化)计算多个图层的面积
- 使用jqery模拟网易严选购物车功能
- 绝地反击显示服务器,绝地反击
- OpenCV之 图像染色
- 解决IE浏览器被2345劫持问题
- Android 打开系统文件管理器选择文件
- 51单片机之外部中断拙见
- visual studio 2022安装vsix插件
- 龙光“五一”热销数十亿,领跑全国楼市
- UiPath安装教程
- 链表-单向链表的实现
- String 的常用API?
- 【搬运合集】六款绿色的学习休闲软件,助你成为考研锦鲤!