结合源码分析Seata的工作流程

  • Post author:
  • Post category:其他

之前已经写过关于Seata的几种模式的原理以及Seata的一个AT模式的demo
今天结合源码讲一下seata的工作流程

配置文件

之前也说了seata的配置中心是怎么实现的https://blog.csdn.net/weixin_45661382/article/details/105539999
这里说一下配置的内容,已默认的file为例:
file.conf中有3部分配置内容:
1.transport
transport部分的配置是关于Netty的配置,主要体现在io.seata.core.rpc.netty包下的NettyBaseConfig、NettyServerConfig、NettyClientConfig,client与server的通信使用的是Netty
2.service
仅针对client有效

# service configuration, only used in client side
service {
  #transaction service group mapping
  #my_test_tx_group--->自定义分布式事务组名称  default--->对应registry.conf注册中心定义的名称,如eureka里的application
  vgroupMapping.my_test_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  #db模式改配置无效
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  #是否启用seata的分布式事务
  disableGlobalTransaction = false
}

io.seata.spring.annotation.GlobalTransactionScanner
在这里插入图片描述
3.client
仅针对client有效

#client transaction configuration, only used in client side
client {
  rm {
  	#RM接收TC的commit通知后缓冲上限
    asyncCommitBufferLimit = 10000
    lock {
      #校验或占用全局锁重试间隔 默认10,单位毫秒
      retryInterval = 10
      #校验或占用全局锁重试次数 默认30
      retryTimes = 30
      #分支事务与其它全局回滚事务冲突时锁策略	默认true,优先释放本地锁让回滚成功
      retryPolicyBranchRollbackOnConflict = true
    }
    #一阶段结果上报TC重试次数	默认5次
    reportRetryCount = 5
    #自动刷新缓存中的表结构	默认false
    tableMetaCheckEnable = false
    #是否上报一阶段成功	truefalse,从1.1.0版本开始,默认false.true用于保持分支事务生命周期记录完整,false可提高不少性能
    reportSuccessEnable = false
    sqlParserType = druid
  }
  tm {
  	#一阶段全局提交结果上报TC重试次数	默认1次,建议大于1
    commitRetryCount = 5
    #一阶段全局回滚结果上报TC重试次数	默认1次,建议大于1
    rollbackRetryCount = 5
  }
  undo {
  	#二阶段回滚镜像校验	默认true开启,false关闭
    dataValidation = true
    #undo序列化方式	默认jackson
    logSerialization = "jackson"
    #自定义undo表名	默认undo_log
    logTable = "undo_log"
  }
  log {
  	#日志异常输出概率 默认100,目前用于undo回滚失败时异常堆栈输出,百分之一的概率输出,回滚失败基本是脏数据,无需输出堆栈占用硬盘空间
    exceptionRate = 100
  }
}

Seata加载入口

seata的加载入口位于io.seata.spring.boot.autoconfigure.SeataAutoConfiguration:

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

    @Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
    @ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
    public SpringApplicationContextProvider springApplicationContextProvider() {
        return new SpringApplicationContextProvider();
    }

    @Bean(BEAN_NAME_FAILURE_HANDLER)
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }

    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

    @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
    @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
        return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());
    }
}

这里有一个配置项SeataProperties,用于配置事务分组名称,即读取如下配置:

spring.cloud.alibaba.fescar.tx-service-group=my_test_tx_group
	io.seata.spring.boot.autoconfigure.properties.SeataProperties.getTxServiceGroup:
	public String getTxServiceGroup() {
        if (null == txServiceGroup) {
            txServiceGroup = springCloudAlibabaConfiguration.getTxServiceGroup();
        }
        return txServiceGroup;
    }
	io.seata.spring.boot.autoconfigure.properties.SpringCloudAlibabaConfiguration.getTxServiceGroup:
    public String getTxServiceGroup() {
        if (null == txServiceGroup) {
            String applicationId = getApplicationId();
            if (null == applicationId) {
                LOGGER.warn("{} is null, please set its value", SPRING_APPLICATION_NAME_KEY);
            }
            txServiceGroup = applicationId + DEFAULT_SPRING_CLOUD_SERVICE_GROUP_POSTFIX;
        }
        return txServiceGroup;
    }

如果没有配置,则使用spring.application.name+ -seata-service-group生成一个名称,所以如果不配置spring.application.name启动会报错
有了applicationId, txServiceGroup之后则创建一个io.seata.spring.annotation.GlobalTransactionScanner对象,主要看它的initClient()

private void initClient() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Initializing Global Transaction Clients ... ");
        }
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
            throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
        }
        //init TM
        TMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }
        //init RM
        RMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global Transaction Clients are initialized. ");
        }
        registerSpringShutdownHook();

    }

可以看到初始化了TMClientRMClient,所以对于一个服务既可以是TM角色也可以是RM角色,至于什么时候是TM或者RM则要看在一次全局事务中@GlobalTransactional注解标注在哪。
Client创建的结果是与TC的一个Netty连接,所以在启动日志中可以看到两个Netty Channel,其中也标明了transactionRole分别为TMROLERMROLE

2020-04-29 10:26:17.478  INFO 17072 --- [imeoutChecker_1] i.s.c.r.netty.NettyClientChannelManager  : will connect to 192.168.1.151:18091
2020-04-29 10:26:17.482  INFO 17072 --- [imeoutChecker_1] i.s.core.rpc.netty.NettyPoolableFactory  : NettyPool create channel to transactionRole:TMROLE,address:192.168.1.151:18091,msg:< RegisterTMRequest{applicationId='account-server', transactionServiceGroup='ydc_tx_group'} >
2020-04-29 10:26:17.534  INFO 17072 --- [imeoutChecker_2] i.s.c.r.netty.NettyClientChannelManager  : will connect to 192.168.1.151:18091
2020-04-29 10:26:17.534  INFO 17072 --- [imeoutChecker_2] io.seata.core.rpc.netty.RmRpcClient      : RM will register :jdbc:mysql://127.0.0.1:3306/seata-account
2020-04-29 10:26:17.535  INFO 17072 --- [imeoutChecker_2] i.s.core.rpc.netty.NettyPoolableFactory  : NettyPool create channel to transactionRole:RMROLE,address:192.168.1.151:18091,msg:< RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata-account', applicationId='account-server', transactionServiceGroup='ydc_tx_group'} >
2020-04-29 10:26:17.904  WARN 17072 --- [lector_RMROLE_1] i.s.common.loader.EnhancedServiceLoader  : load [io.seata.serializer.hessian.HessianSerializer] class fail. com/caucho/hessian/io/AbstractHessianOutput
2020-04-29 10:26:17.905  INFO 17072 --- [lector_RMROLE_1] i.s.common.loader.EnhancedServiceLoader  : load Serializer[SEATA] extension by class[io.seata.serializer.seata.SeataSerializer]
2020-04-29 10:26:18.030  INFO 17072 --- [imeoutChecker_2] io.seata.core.rpc.netty.RmRpcClient      : register RM success. server version:1.1.0,channel:[id: 0x47882342, L:/192.168.1.192:49367 - R:/192.168.1.151:18091]
2020-04-29 10:26:18.040  INFO 17072 --- [imeoutChecker_1] i.s.core.rpc.netty.NettyPoolableFactory  : register success, cost 195 ms, version:1.1.0,role:TMROLE,channel:[id: 0x46ae7e39, L:/192.168.1.192:49368 - R:/192.168.1.151:18091]
2020-04-29 10:26:18.040  INFO 17072 --- [imeoutChecker_2] i.s.core.rpc.netty.NettyPoolableFactory  : register success, cost 205 ms, version:1.1.0,role:RMROLE,channel:[id: 0x47882342, L:/192.168.1.192:49367 - R:/192.168.1.151:18091]

TM处理流程

TM的一个作用就是开启全局事务,实际应用时在需要开启事务的方法上加注解@GlobalTransactional,现在来看下它的作用:

这里有一个拦截器,io.seata.spring.annotation.GlobalTransactionalInterceptor:

//部分代码
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {

    private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
    private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();

    private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
    private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();
    private final FailureHandler failureHandler;
    private volatile boolean disable;

    /**
     * Instantiates a new Global transactional interceptor.
     *
     * @param failureHandler the failure handler
     */
    public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
        this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
        this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            DEFAULT_DISABLE_GLOBAL_TRANSACTION);
    }

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())
            : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        if (!disable && globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (!disable && globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }
}

如果启用seata的分布式事务且有注解GlobalTransactional执行 handleGlobalTransaction():
io.seata.spring.annotation.GlobalTransactionalInterceptor.handleGlobalTransaction()

private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                           final GlobalTransactional globalTrxAnno) throws Throwable {
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }

                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                @Override
                public TransactionInfo getTransactionInfo() {
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(globalTrxAnno.propagation());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));

            }
        }
    }

TransactionalTemplate定义了TM对全局事务处理的标准步骤:
io.seata.tm.api.TransactionalTemplate.execute()

    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.2 Handle the Transaction propatation and the branchType
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    suspendedResourcesHolder = tx.suspend(true);
                    return business.execute();
                case REQUIRES_NEW:
                    suspendedResourcesHolder = tx.suspend(true);
                    break;
                case SUPPORTS:
                    if (!existingTransaction()) {
                        return business.execute();
                    }
                    break;
                case REQUIRED:
                    break;
                case NEVER:
                    if (existingTransaction()) {
                        throw new TransactionException(
                                String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
                                        ,RootContext.getXID()));
                    } else {
                        return business.execute();
                    }
                case MANDATORY:
                    if (!existingTransaction()) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }


            try {

                // 2. begin transaction
                beginTransaction(txInfo, tx);

                Object rs = null;
                try {

                    // Do Your Business
                    rs = business.execute();

                } catch (Throwable ex) {

                    // 3.the needed business exception to rollback.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            tx.resume(suspendedResourcesHolder);
        }

    }

GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();这一步确定了全局事务的角色:

	//io.seata.tm.api.GlobalTransactionContext.getCurrentOrCreate()
	public static GlobalTransaction getCurrentOrCreate() {
        GlobalTransaction tx = getCurrent();
        if (tx == null) {
            return createNew();
        }
        return tx;
    }
    //io.seata.tm.api.GlobalTransactionContext.getCurrent()
    private static GlobalTransaction getCurrent() {
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
    }
    //io.seata.tm.api.GlobalTransactionContext.createNew()
    private static GlobalTransaction createNew() {
        return new DefaultGlobalTransaction();
    }
    //io.seata.tm.api.DefaultGlobalTransaction.DefaultGlobalTransaction()
    DefaultGlobalTransaction() {
        this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
    }

根据当前上下文是否已存在XID来判断,没有XID的就是Launcher(发起者),已经存在XID的就是Participant(参与者)

开启全局事务:
io.seata.tm.api.TransactionalTemplate.beginTransaction()

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeBegin();
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
    }

io.seata.tm.api.DefaultGlobalTransaction.begin()

	@Override
    public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }

    }

如果是参与者直接return,如果是发起者,执行具体开启事务的方法,获取TC返回的XID:
xid = transactionManager.begin(null, null, name, timeout);

io.seata.tm.DefaultTransactionManager负责TM与TC通讯,发送begin、commit、rollback指令

public class DefaultTransactionManager implements TransactionManager {

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setXid(xid);
        GlobalStatusResponse response = (GlobalStatusResponse)syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {
        GlobalReportRequest globalReport = new GlobalReportRequest();
        globalReport.setXid(xid);
        globalReport.setGlobalStatus(globalStatus);
        GlobalReportResponse response = (GlobalReportResponse) syncCall(globalReport);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
        }
    }
}

至此拿到TC返回的XID一个全局事务就开启了,全局事务创建后,就开始执行business.execute(),即我们的业务代码,进入RM处理流程

RM处理流程

RM是如何加入到全局事务中的呢?答案就是之前说过的seata数据源代理,通过DataSourceProxy才能在业务代码的事务提交时,seata通过这个切入点,来给TC发送RM的处理结果
由于业务代码本身的事务提交被ConnectionProxy代理,所以在提交本地事务时,实际执行的是ConnectionProxy的commit方法

//部分代码
public class ConnectionProxy extends AbstractConnectionProxy {
	@Override
    public void commit() throws SQLException {
        try {
            LOCK_RETRY_POLICY.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

    private void doCommit() throws SQLException {
    	//如果当前是全局事务,则执行全局事务的提交
    	//判断是不是全局事务,就是看当前上下文是否存在XID
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

    private void processLocalCommitWithGlobalLocks() throws SQLException {
        checkLock(context.buildLockKeys());
        try {
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }

    private void processGlobalTransactionCommit() throws SQLException {
        try {
        	//首先是向TC注册RM,拿到TC分配的branchId
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
        	//写入undolog
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //提交本地事务,可以看到写入undolog和业务数据是在同一个本地事务中
            targetConnection.commit();
        } catch (Throwable ex) {
        	//向TC发送RM的事务处理失败的通知
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        //向TC发送rm的事务处理成功的通知
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }
	//注册RM,构建request通过netty向TC发送指令
    //将返回的branchId存在上下文中
    private void register() throws TransactionException {
        if (!context.hasUndoLog() || context.getLockKeysBuffer().isEmpty()) {
            return;
        }
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), null, context.buildLockKeys());
        context.setBranchId(branchId);
    }
}

具体步骤:
1.获取business-service传来的XID
2.绑定XID到当前上下文中
3.执行业务逻辑sql
4.向TC创建本次RM的Netty连接
5.向TC发送分支事务的相关信息
6.获得TC返回的branchId
7.记录Undo Log数据
8.向TC发送本次事务PhaseOne阶段的处理结果
9.从当前上下文中解绑XID

其中第1步和第9步在seata并没有实现,需要自己实现或者引入spring-cloud-alibaba-seata的依赖,spring-cloud-alibaba-seata中的com.alibaba.cloud.seata.web.SeataHandlerInterceptor完成了bind和unbind XID到上下文中。

public class SeataHandlerInterceptor implements HandlerInterceptor {
    private static final Logger log = LoggerFactory.getLogger(SeataHandlerInterceptor.class);

    public SeataHandlerInterceptor() {
    }

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String xid = RootContext.getXID();
        String rpcXid = request.getHeader("TX_XID");
        if(log.isDebugEnabled()) {
            log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
        }

        if(xid == null && rpcXid != null) {
            RootContext.bind(rpcXid);
            if(log.isDebugEnabled()) {
                log.debug("bind {} to RootContext", rpcXid);
            }
        }

        return true;
    }

    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception e) {
        String rpcXid = request.getHeader("TX_XID");
        if(!StringUtils.isEmpty(rpcXid)) {
            String unbindXid = RootContext.unbind();
            if(log.isDebugEnabled()) {
                log.debug("unbind {} from RootContext", unbindXid);
            }

            if(!rpcXid.equalsIgnoreCase(unbindXid)) {
                log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
                if(unbindXid != null) {
                    RootContext.bind(unbindXid);
                    log.warn("bind {} back to RootContext", unbindXid);
                }
            }

        }
    }
}

到这里RM完成了一阶段的工作,接着看二阶段的处理逻辑。

事务提交

在RM启动时创建了与TC通讯的Netty连接,TC在获取各RM的汇报结果后,就会给各RM发送commit或rollback的指令
具体看下commit的过程:

io.seata.rm.AbstractRMHandler.doBranchCommit():

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
        throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
        }
        BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch commit result: " + status);
        }

    }

获取request里的xid 、branchId 、resourceId、applicationData 、branchType,这里的branchType是一个枚举类型:

public enum BranchType {

    /**
     * The At.
     */
    // AT Branch
    AT,
    
    /**
     * The TCC.
     */
    TCC,

    /**
     * The SAGA.
     */
    SAGA,

    /**
     * The XA.
     */
    XA;

    /**
     * Get branch type.
     *
     * @param ordinal the ordinal
     * @return the branch type
     */
    public static BranchType get(byte ordinal) {
        return get((int)ordinal);
    }

    /**
     * Get branch type.
     *
     * @param ordinal the ordinal
     * @return the branch type
     */
    public static BranchType get(int ordinal) {
        for (BranchType branchType : BranchType.values()) {
            if (branchType.ordinal() == ordinal) {
                return branchType;
            }
        }
        throw new IllegalArgumentException("Unknown BranchType[" + ordinal + "]");
    }
}

然后执行branchCommit,将需要提交的XID加入list:
io.seata.rm.datasource.AsyncWorker.branchCommit():

@Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
            LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
        }
        return BranchStatus.PhaseTwo_Committed;
    }

因为一阶段本地事务已经提交了,如果是全局提交只需要异步删除对应的undo_log记录即可,所以有如下操作:
io.seata.rm.datasource.AsyncWorker:

//通过一个定时任务消费list中的待提交XID
public synchronized void init() {
        LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
        ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
        timerExecutor.scheduleAtFixedRate(() -> {
            try {

                doBranchCommits();

            } catch (Throwable e) {
                LOGGER.info("Failed at async committing ... {}", e.getMessage());

            }
        }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
    }

    private void doBranchCommits() {
        if (ASYNC_COMMIT_BUFFER.isEmpty()) {
            return;
        }

        Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
        //一次定时任务取出ASYNC_COMMIT_BUFFER中的所有待办数据
        //以resourceId作为key分组待办数据,resourceId就是一个数据库的连接url
        //在前面的日志中可以看到,目的是为了覆盖应用的多数据源问题
        while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
            Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
            List<Phase2Context> contextsGroupedByResourceId = mappedContexts.computeIfAbsent(commitContext.resourceId, k -> new ArrayList<>());
            contextsGroupedByResourceId.add(commitContext);
        }

        for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
            Connection conn = null;
            DataSourceProxy dataSourceProxy;
            try {
                try {
                    DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()
                        .getResourceManager(BranchType.AT);
                    dataSourceProxy = resourceManager.get(entry.getKey());
                    if (dataSourceProxy == null) {
                        throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
                    }
                    conn = dataSourceProxy.getPlainConnection();
                } catch (SQLException sqle) {
                    LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
                    continue;
                }
                List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
                Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
                Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
                for (Phase2Context commitContext : contextsGroupedByResourceId) {
                    xids.add(commitContext.xid);
                    branchIds.add(commitContext.branchId);
                    int maxSize = Math.max(xids.size(), branchIds.size());
                    if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
                        try {
//删除相应的undo_log记录
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
                                xids, branchIds, conn);
                        } catch (Exception ex) {
                            LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                        }
                        xids.clear();
                        branchIds.clear();
                    }
                }

                if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
                    return;
                }

                try {
                    UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
                        branchIds, conn);
                } catch (Exception ex) {
                    LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                }

                if (!conn.getAutoCommit()) {
                    conn.commit();
                }
            } catch (Throwable e) {
                LOGGER.error(e.getMessage(), e);
                try {
                    conn.rollback();
                } catch (SQLException rollbackEx) {
                    LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);
                }
            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException closeEx) {
                        LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                    }
                }
            }
        }
    }

事务回滚

对于rollback场景的触发有两种情况,
1.分支事务处理异常,即调用io.seata.rm.datasource.ConnectionProxy的report(false)的地方:

private void report(boolean commitDone) throws SQLException {
        if (context.getBranchId() == null) {
            return;
        }
        int retry = REPORT_RETRY_COUNT;
        while (retry > 0) {
            try {
                DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                    commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
                return;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
                    + commitDone + "] Retry Countdown: " + retry);
                retry--;

                if (retry == 0) {
                    throw new SQLException("Failed to report branch status " + commitDone, ex);
                }
            }
        }
    }

有两个地方:

	private void processGlobalTransactionCommit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }
    
    @Override
    public void rollback() throws SQLException {
        targetConnection.rollback();
        if (context.inGlobalTransaction() && context.isBranchRegistered()) {
            report(false);
        }
        context.reset();
    }

2.TM捕获到下游系统上抛的异常,即发起全局事务标有@GlobalTransactional注解的方法捕获到的异常。在前面TransactionalTemplate类的execute模版方法中,对business.execute()的调用进行了catch,catch后会调用completeTransactionAfterThrowing(txInfo, tx, ex),由TM通知TC对应XID需要回滚事务

	private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException {
        //roll back
        if (txInfo != null && txInfo.rollbackOn(ex)) {
            try {
                rollbackTransaction(tx, ex);
            } catch (TransactionException txe) {
                // Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, ex);
            }
        } else {
            // not roll back on this exception, so commit
            commitTransaction(tx);
        }
    }
    private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException {
        triggerBeforeRollback();
        tx.rollback();
        triggerAfterRollback();
        // 3.1 Successfully rolled back
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, ex);
    }
    @Override
    public void rollback() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
        	//只有Launcher可以发起这个回滚
            // Participant has no responsibility of rollback
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();

        int retry = ROLLBACK_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    status = transactionManager.rollback(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    retry--;
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global rollback", ex);
                    }
                }
            }
        } finally {
            if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
                suspend(true);
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] rollback status: {}", xid, status);
        }
    }

TC汇总后向参与者发送rollback指令,RM在AbstractRMHandler类的doBranchRollback方法中接收这个rollback的通知

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
        }
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacked result: " + status);
        }
    }

然后执行io.seata.rm.datasource.DataSourceManager的branchRollback方法

@Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            StackTraceLogger.info(LOGGER, te,
                "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
                new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;

    }

最终执行io.seata.rm.datasource.undo.AbstractUndoLogManager的undo
undo的具体流程:
1.根据xid和branchId查找PhaseOne阶段提交的undolog
2.如果找到了就根据undolog中记录的数据生成回放sql并执行,即还原PhaseOne阶段修改的数据
3.第2步处理完后,删除该条undolog数据
4.如果第1步没有找到对应的undolog,就插入一条状态为GlobalFinished的undolog.
出现没找到的原因可能是PhaseOne阶段的本地事务异常了,导致没有正常写入。因为xid和branchId是唯一索引,所以第4步的插入,可以防止PhaseOne阶段后续又写入成功,那么PhaseOne阶段就会异常,这样业务数据也是没有提交成功的,数据最终是回滚了的效果

@Override
    public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement selectPST = null;
        boolean originalAutoCommit = true;

        for (; ; ) {
            try {
                conn = dataSourceProxy.getPlainConnection();

                // The entire undo process should run in a local transaction.
                if (originalAutoCommit = conn.getAutoCommit()) {
                    conn.setAutoCommit(false);
                }

                // Find UNDO LOG
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();

                boolean exists = false;
                while (rs.next()) {
                    exists = true;

                    // It is possible that the server repeatedly sends a rollback request to roll back
                    // the same branch transaction to multiple processes,
                    // ensuring that only the undo_log in the normal state is processed.
                    int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                    if (!canUndo(state)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                        }
                        return;
                    }

                    String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                    Map<String, String> context = parseContext(contextString);
                    byte[] rollbackInfo = getRollbackInfo(rs);

                    String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                    try {
                        // put serializer name to local
                        setCurrentSerializer(parser.getName());
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }
                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                            sqlUndoLog.setTableMeta(tableMeta);
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(), sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }
                    } finally {
                        // remove serializer name
                        removeCurrentSerializer();
                    }
                }

                // If undo_log exists, it means that the branch transaction has completed the first phase,
                // we can directly roll back and clean the undo_log
                // Otherwise, it indicates that there is an exception in the branch transaction,
                // causing undo_log not to be written to the database.
                // For example, the business processing timeout, the global transaction is the initiator rolls back.
                // To ensure data consistency, we can insert an undo_log with GlobalFinished state
                // to prevent the local transaction of the first phase of other programs from being correctly submitted.
                // See https://github.com/seata/seata/issues/489

                if (exists) {
                    deleteUndoLog(xid, branchId, conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
                            State.GlobalFinished.name());
                    }
                } else {
                    insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
                            State.GlobalFinished.name());
                    }
                }

                return;
            } catch (SQLIntegrityConstraintViolationException e) {
                // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
                }
            } catch (Throwable e) {
                if (conn != null) {
                    try {
                        conn.rollback();
                    } catch (SQLException rollbackEx) {
                        LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                    }
                }
                throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
                    .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
                        branchId, e.getMessage()), e);

            } finally {
                try {
                    if (rs != null) {
                        rs.close();
                    }
                    if (selectPST != null) {
                        selectPST.close();
                    }
                    if (conn != null) {
                        if (originalAutoCommit) {
                            conn.setAutoCommit(true);
                        }
                        conn.close();
                    }
                } catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
                }
            }
        }
    }

参考链接:https://blog.csdn.net/f4761/article/details/89077400


版权声明:本文为weixin_45661382原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。