Seata流程源码梳理上篇-TM、RM处理

  • Post author:
  • Post category:其他


这一篇我们主要来分析下Seata的AT模式的流程处理。



一、流程案例



1、案例源码

​ 我们本地流程梳理用的是基于

spring-cloud

框架,注册中心是eurak,服务间调用的是feign,源码下载的是官网的(当然你如果对dubbo更熟悉,也可以使用dubbo结构的),案例地址–https://github.com/seata/seata-samples。

在这里插入图片描述

在这里插入图片描述

​ 这个我本地的话,将其他的模块都删除了。然后启动的话,注意数据库连接信息的修改以及脚本初始化。

​ 这个业务是一个用户账号(

account

)下订单(

order

),然后减库存(

stock

),整个业务的发起就是在

business

发起的。

在这里插入图片描述

在这里插入图片描述



2、服务器源码

​ 然后seata服务端,我下载的1.4.2版本的源码在本地启动。

在这里插入图片描述

​ 对于服务端的启动,用的是db模式

在这里插入图片描述

​ 注册的话,修改为

eurka

在这里插入图片描述

​ 注意运行对应的数据库脚本。



二、一些前置概念的介绍

​ 可以先再看下其官网的介绍–http://seata.io/zh-cn/docs/overview/what-is-seata.html。然后我们结合源码再来说下。



1、整个案例的TM、RM、TC业务逻辑

在这里插入图片描述


TC (Transaction Coordinator) – 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。


TM (Transaction Manager) – 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。


RM (Resource Manager) – 资源管理器

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

​ 这里主要我们需要明白TM、TC的概念,TM其实就是全局事务的发起者,TC就是协调者,例如如果全局事务需要回滚或者提交,就是TM去通知TC,然后TC再获取、调用该全局事务下的各个分支,通知他们去进行提交或回滚。

​ TC其实就是我们的seata服务端,如果我们要开启全局事务,通过

TM

(对于我们代码来说,TM也就是

TransactionManager

接口)发起(发起也就是调用

begin

方法,获取到本次分布式事务唯一全局标识

xid

),也就是我们在调用

businessService.purchase("U100000", "C100000", 30)

方法时,就会通过拦截器拦截,进行对应分布式事务的逻辑织入,然后在里面调用

TC

,告诉它我们要开始分布式事务了,然后

TC

就是在

global_table

添加一条记录,就是本次分布式事务的唯一标识记录:

CREATE TABLE `global_table` (
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint DEFAULT NULL,
  `status` tinyint NOT NULL,
  `application_id` varchar(64) DEFAULT NULL,
  `transaction_service_group` varchar(64) DEFAULT NULL,
  `transaction_name` varchar(64) DEFAULT NULL,
  `timeout` int DEFAULT NULL,
  `begin_time` bigint DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`xid`),
  KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
  KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

在这里插入图片描述

​ 然后

TC

(也就是seata服务端)将

xid

返回,之后就会具体来调用到业务方法,也就是

purchase(...)

方法的业务逻辑:

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
    stockFeignClient.deduct(commodityCode, orderCount);

    orderFeignClient.create(userId, commodityCode, orderCount);

    if (!validData()) {
        throw new RuntimeException("账户或库存不足,执行回滚");
    }
}

​ 然后我们就会通过

feign

去调用

stock

服务以及

order

服务,在调用的时候,将

xid

加在请求头部中。



stock

接受到调用后,

seata

就会通过拓展spring提供的

HandlerInterceptor

接口(具体可以去了解下这个接口)实现

SeataHandlerInterceptor

,从头部中获取到xid,然后绑定到当前线程中,然后去执行对应的业务。

​ 对应的业务方法就会去操作DB,而这个时候,seata同样有个织入逻辑,也就是代理spring本身的

DataSource



DataSourceProxy

类,当操作数据库时,通过

DataSource

获取

Connection

时,其又会返回

ConnectionProxy

类来代理本身的

Connection

,然后在操作数据库之前、之后,添加对应的

seata

逻辑。例如:

​ 1、记录

undo_log

表的数据,用于回滚时候的恢复重做

CREATE TABLE `undo_log` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `branch_id` bigint NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=147 DEFAULT CHARSET=utf8mb3;

在这里插入图片描述

​ 2、通过

RM

(也就是

ResourceManager

接口)进行分支注册(调用

ResourceManager

接口的branchRegister方法),也就是调用

TC

告诉它,然后

TC

就会在

branch_table

表添加一条记录,这里的一条记录,就是一个

TM

对应的数据

CREATE TABLE `branch_table` (
  `branch_id` bigint NOT NULL,
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint DEFAULT NULL,
  `resource_group_id` varchar(32) DEFAULT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `lock_key` varchar(128) DEFAULT NULL,
  `branch_type` varchar(8) DEFAULT NULL,
  `status` tinyint DEFAULT NULL,
  `client_id` varchar(64) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

在这里插入图片描述

​ 然后当

purchase(String userId, String commodityCode, int orderCount)

方法调用

order



stock

成功完成后,这个时候就会在进入到

TM

也就是对应的拦截器逻辑中,如果成功执行分布式事务提交操作,失败就会捕获到异常,进行全局事务的回滚操作。例如

TM

调用

TC

告诉它完成分布式事务的提交,然后

TC

就会通过

xid

获取到当前事务下有几个分支,也就是对应

RM

完成提交(

ResourceManager



branchCommit

方法),这个就是整个seata定义分布式事务的基本逻辑。



2、TM(TransactionManager)

public interface TransactionManager {

    /**
     * 开启一个全局事务(也就是调用),返回xid,也就是本次全局事务的标识
     */
    String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException;

    /**
     * Global commit.
     *
     * @param xid XID of the global transaction.
     * @return Status of the global transaction after committing.
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    GlobalStatus commit(String xid) throws TransactionException;

    /**
     * Global rollback.
     *
     * @param xid XID of the global transaction
     * @return Status of the global transaction after rollbacking.
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    GlobalStatus rollback(String xid) throws TransactionException;
	.........
}



3、SeataHandlerInterceptor

public class SeataHandlerInterceptor implements HandlerInterceptor {

   private static final Logger log = LoggerFactory
         .getLogger(SeataHandlerInterceptor.class);

    /**
     *	从头部中获取xid,绑定到当前线程
    **/
   @Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
         Object handler) {

      String xid = RootContext.getXID();
       //public static final String KEY_XID = "TX_XID";
      String rpcXid = request.getHeader(RootContext.KEY_XID);
      if (log.isDebugEnabled()) {
         log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
      }

      if (xid == null && rpcXid != null) {
         RootContext.bind(rpcXid);
         if (log.isDebugEnabled()) {
            log.debug("bind {} to RootContext", rpcXid);
         }
      }
      return true;
   }

    /**
     *	业务执行完后,删除
    **/
   @Override
   public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
         Object handler, Exception e) {

      String rpcXid = request.getHeader(RootContext.KEY_XID);

      if (StringUtils.isEmpty(rpcXid)) {
         return;
      }

      String unbindXid = RootContext.unbind();
      if (log.isDebugEnabled()) {
         log.debug("unbind {} from RootContext", unbindXid);
      }
      if (!rpcXid.equalsIgnoreCase(unbindXid)) {
         log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
         if (unbindXid != null) {
            RootContext.bind(unbindXid);
            log.warn("bind {} back to RootContext", unbindXid);
         }
      }
   }

}



4、RM(ResourceManager)

在这里插入图片描述



三、TM相关流程源码

​ 我们上面简单介绍了seata分布式事务的简单流程,下面我们就来具体分析下源码的流转。将上面的各个应用启动后,我们调用

http://localhost:8084/purchase/commit

,正式执行我们的业务流程。



1、拦截对应的使用了全局事务的方法

​ 我们在

purchase(...)

方法使用的

@GlobalTransactional

注解,就会有对应的拦截器来解析对应的逻辑增强,加入对应的全局事务。

在这里插入图片描述

​ 如果没有对应全局事务的标签,就不处理,直接通过

methodInvocation.proceed()

执行对应的方法。我们这里是有全局事务的,就通过

handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation)

来开始对应的逻辑处理。



2、处理全局事务

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
    final GlobalTransactional globalTrxAnno) throws Throwable {
    boolean succeed = true;
    try {
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }
			.......
            @Override
            public TransactionInfo getTransactionInfo() {
                // reset the value of timeout
                int timeout = globalTrxAnno.timeoutMills();
                if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                    timeout = defaultGlobalTransactionTimeout;
                }
				// 这个是本次事务的对应信息、例如传播级别、超时时间
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(timeout);
                transactionInfo.setName(name());
                transactionInfo.setPropagation(globalTrxAnno.propagation());
                transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
                transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
               	.........
                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        	.........
    } finally {
        if (degradeCheck) {
            EVENT_BUS.post(new DegradeCheckEvent(succeed));
        }
    }
}

​ 这里的逻辑主要是给到

TransactionalTemplate

来处理对应的逻辑。



3、全局事务的整个流程逻辑

public class TransactionalTemplate {
		..........
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. Get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 Handle the transaction propagation.
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    // If transaction is existing, suspend it.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
                    return business.execute();
                case REQUIRES_NEW:
                    // If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // Continue and execute with new transaction
                    break;
               			............
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
              		..........
                return rs;
            } finally {
                //5. clear
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            // If the transaction is suspended, resume it.
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }

​ 这里其实主要分为两部分:

​ 一部分是分布式事务其本身也是一个事务,所以前面的

switch (propagation)

里面的逻辑,就是遵循spring本身的事务传播机制。例如如果是

NOT_SUPPORTED

,其不需要事务,就会将原来的事务挂起,然后直接执行对应的业务,而如果是

REQUIRES_NEW

的话,其就需要先将原来的事务挂起,再开启一个新事务来继续处理往下的逻辑。

​ 另一部分就是具体的分布式事务的处理了。

 // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
    if (tx == null) {
        tx = GlobalTransactionContext.createNew();
    }

    // set current tx config to holder
    GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

    try {
        // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
        //    else do nothing. Of course, the hooks will still be triggered.
        beginTransaction(txInfo, tx);

        Object rs;
        try {
            // Do Your Business
            rs = business.execute();
        } catch (Throwable ex) {
            // 3. The needed business exception to rollback.
            completeTransactionAfterThrowing(txInfo, tx, ex);
            throw ex;
        }

        // 4. everything is fine, commit.
        commitTransaction(tx);

        return rs;
    } finally {
        //5. clear
        resumeGlobalLockConfig(previousConfig);
        triggerAfterCompletion();
        cleanUp();
    }
} finally {
    // If the transaction is suspended, resume it.
    if (suspendedResourcesHolder != null) {
        tx.resume(suspendedResourcesHolder);
    }
}



1、获取DefaultGlobalTransaction

​ 这里首先就是通过

GlobalTransactionContext.createNew()

获取

DefaultGlobalTransaction

public static GlobalTransaction createNew() {
    return new DefaultGlobalTransaction();
}
DefaultGlobalTransaction() {
    this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}

/**
 * Instantiates a new Default global transaction.
 *
 * @param xid    the xid
 * @param status the status
 * @param role   the role
 */
DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
    this.transactionManager = TransactionManagerHolder.get();
    this.xid = xid;
    this.status = status;
    this.role = role;
}

​ 这个类里面可以获取到

TransactionManager

事务管理器,也就是

TM



transactionManager

是单例模式创建的。



2、开启事务(调用TC获取xid)

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeBegin();
        tx.begin(txInfo.getTimeOut(), txInfo.getName());
        triggerAfterBegin();
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.BeginFailure);

    }
}

​ 然后我们会开启全局事务的流程,也就是首先获取

xid

。本次全局事务的状态设置为

GlobalStatus.Begin

,然后将xid绑定到当前线程上面。

public void begin(int timeout, String name) throws TransactionException {
    if (role != GlobalTransactionRole.Launcher) {
        assertXIDNotNull();
       	........
        return;
    }
    assertXIDNull();
    String currentXid = RootContext.getXID();
    if (currentXid != null) {
        throw new IllegalStateException("Global transaction already exists," +
            " can't begin a new global transaction, currentXid = " + currentXid);
    }
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
}



3、执行业务(成功则提交、失败则回滚)

​ 我们获取到

xid

开始全局事务后,就执行我们的业务。

beginTransaction(txInfo, tx);

Object rs;
try {
    // Do Your Business
    rs = business.execute();
} catch (Throwable ex) {
    // 3. The needed business exception to rollback.
    completeTransactionAfterThrowing(txInfo, tx, ex);
    throw ex;
}

// 4. everything is fine, commit.
commitTransaction(tx);



4、执行成功TM全局事务提交

private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeCommit();
        tx.commit();
        triggerAfterCommit();
    } catch (TransactionException txe) {
        // 4.1 Failed to commit
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.CommitFailure);
    }
}
public void commit() throws TransactionException {
    if (role == GlobalTransactionRole.Participant) {
        // Participant has no responsibility of committing
        return;
    }
    assertXIDNotNull();
    int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
    try {
        while (retry > 0) {
            try {
                status = transactionManager.commit(xid);
                break;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                retry--;
                if (retry == 0) {
                    throw new TransactionException("Failed to report global commit", ex);
                }
            }
        }
    } finally {
        if (xid.equals(RootContext.getXID())) {
            suspend();
        }
    }
}

​ 这里我们可以看到当业务正常完成后,其会通过TM(TransactionManager)去提交事务,也就是调TC提交本次全局事务,让TC去通知各个RM本次事务已正常完成。

public GlobalStatus commit(String xid) throws TransactionException {
    GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    globalCommit.setXid(xid);
    GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
    return response.getGlobalStatus();
}



5、执行失败TM全局事务回滚

private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
    triggerBeforeRollback();
    tx.rollback();
    triggerAfterRollback();
    // 3.1 Successfully rolled back
    throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
        ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}
public void rollback() throws TransactionException {
    if (role == GlobalTransactionRole.Participant) {
        return;
    }
    assertXIDNotNull();

    int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
    try {
        while (retry > 0) {
            try {
                status = transactionManager.rollback(xid);
                break;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                retry--;
                if (retry == 0) {
                    throw new TransactionException("Failed to report global rollback", ex);
                }
            }
        }
    } finally {
        if (xid.equals(RootContext.getXID())) {
            suspend();
        }
    }
}

​ 这个与提交一样的逻辑,只是这里调用的是TM的回滚接口。



四、RM相关流程源码

在这里插入图片描述

​ 我们再回到这张图片,以及对应的方法调用:

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
    stockFeignClient.deduct(commodityCode, orderCount);

    orderFeignClient.create(userId, commodityCode, orderCount);

    if (!validData()) {
        throw new RuntimeException("账户或库存不足,执行回滚");
    }
}

​ 我们前面梳理了,TM开启事务主要是通过拦截器在调用

purchase(...)

进行对应全局事务开始处理,然后在

purchase(...)

完成后,再通过拦截器来解析TM的提交或者回滚。然后RM的逻辑有两个,一个是在

purchase(...)

方法通过feign调用

stock



order

是,其的处理,另一个是TC来解析全局事务提交、或者回滚的调用。



1、purchase业务方法调用stockFeignClient.deduct

@FeignClient(name = "stock-service", url = "127.0.0.1:8081")
public interface StockFeignClient {

    @GetMapping("/deduct")
    void deduct(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count);

}

​ 这里是通过feign去调用到stock服务,然后seata在这里做了个处理,也是代理模式,代理本身的feign

Client

,然后增强,也就是将

xid

加到

header

里面:

public class SeataFeignClient implements Client {

   private final Client delegate;
   private final BeanFactory beanFactory;
   private static final int MAP_SIZE = 16;

   SeataFeignClient(BeanFactory beanFactory) {
      this.beanFactory = beanFactory;
      this.delegate = new Client.Default(null, null);
   }
	.........

   @Override
   public Response execute(Request request, Request.Options options) throws IOException {

      Request modifiedRequest = getModifyRequest(request);
      return this.delegate.execute(modifiedRequest, options);
   }

   private Request getModifyRequest(Request request) {

      String xid = RootContext.getXID();
	 .........
      Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
      headers.putAll(request.headers());

      List<String> fescarXid = new ArrayList<>();
      fescarXid.add(xid);
      headers.put(RootContext.KEY_XID, fescarXid);

      return Request.create(request.method(), request.url(), headers, request.body(),
            request.charset());
   }

}



2、stock服务取xid(SeataHandlerInterceptor)

​ 在business服务调用时添加了

xid

到同步中,在

stock

肯定会取出来,其的获取也是拓展spring的

HandlerInterceptor

接口。

在这里插入图片描述

public class SeataHandlerInterceptor implements HandlerInterceptor {

   private static final Logger log = LoggerFactory
         .getLogger(SeataHandlerInterceptor.class);

   @Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
         Object handler) {

      String xid = RootContext.getXID();
    	........
      if (xid == null && rpcXid != null) {
         RootContext.bind(rpcXid);
      }
      return true;
   }

   @Override
   public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
         Object handler, Exception e) {

      String rpcXid = request.getHeader(RootContext.KEY_XID);

      if (StringUtils.isEmpty(rpcXid)) {
         return;
      }
      String unbindXid = RootContext.unbind();
  
      if (!rpcXid.equalsIgnoreCase(unbindXid)) {
         if (unbindXid != null) {
            RootContext.bind(unbindXid);
         }
      }
   }

}

​ 这里就是在

DispatcherServlet

调用实际

Controller

业务方法其重头部中获取然后绑定到当前线程中,在业务方法完成后再

unbind

。在这里处理后,就会进入到

controller

的业务逻辑中。



3、stock的DB相关逻辑(主要织入分支注册、undo日志记录)

在这里插入图片描述

​ 然后就就会进行DB操作,同时我们可以看到,

Seata

的在这里也是通过代理模式处理了

DataSource

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    ..........
	@Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

​ 主要的逻辑增强处理就是在

ConnectionProxy

中。并且seata的处理不单是这个代理了,还有

PreparedStatement

这些,我们在这里就不是梳理其的具体流转了,有点绕。我们自己看

seata

在进行DB操作前做了对应的哪些处理。



1)、判断处理是否设置自动提交

public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getAutoCommit()) {
        return executeAutoCommitTrue(args);
    } else {
        return executeAutoCommitFalse(args);
    }
}

​ 在执行前,判断是否为自动提交,如果为自动提交,是的话调用executeAutoCommitTrue,这个方法主要是将自动提交设置为不自动提交,然后调用executeAutoCommitFalse(args),再进行提交connectionProxy.commit():

protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        connectionProxy.changeAutoCommit();
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            T result = executeAutoCommitFalse(args);
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}



2)、处理业务执行

protected T executeAutoCommitFalse(Object[] args) throws Exception {
    if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
        throw new NotSupportYetException("multi pk only support mysql!");
    }
    TableRecords beforeImage = beforeImage();
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    TableRecords afterImage = afterImage(beforeImage);
    prepareUndoLog(beforeImage, afterImage);
    return result;
}

​ 这个我们可以看到逻辑,在执行业务sql前获取修改前的逻辑,执行后以及获取修改后的数据,保存到

undo

日志

protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
		.........
    TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
    String lockKeys = buildLockKey(lockKeyRecords);
    if (null != lockKeys) {
        connectionProxy.appendLockKey(lockKeys);

        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }
}

在这里插入图片描述



3)、DB提交(RM分支事务注册、undo数据记录)

private void processGlobalTransactionCommit() throws SQLException {
    try {
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        report(false);
        throw new SQLException(ex);
    }
    if (IS_REPORT_SUCCESS_ENABLE) {
        report(true);
    }
    context.reset();
}

​ 在提交的时候主要有四个处理:



1、作为RM向TC注册分支事务

private void register() throws TransactionException {
    if (!context.hasUndoLog() || !context.hasLockKey()) {
        return;
    }
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
        null, context.getXid(), null, context.buildLockKeys());
    context.setBranchId(branchId);
}
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
    try {
        BranchRegisterRequest request = new BranchRegisterRequest();
        request.setXid(xid);
        request.setLockKey(lockKeys);
        request.setResourceId(resourceId);
        request.setBranchType(branchType);
        request.setApplicationData(applicationData);

        BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
        }
        return response.getBranchId();
    } catch (TimeoutException toe) {
        throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
    } catch (RuntimeException rex) {
        throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
    }
}

​ 这个就是告诉TC直接是本次全局事务的一个分支事务。然后TC收到后就会在

branch_table

中添加一条记录。



2、undo表记录插入

public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
    ConnectionContext connectionContext = cp.getContext();
    if (!connectionContext.hasUndoLog()) {
        return;
    }
    String xid = connectionContext.getXid();
    long branchId = connectionContext.getBranchId();
    BranchUndoLog branchUndoLog = new BranchUndoLog();
    branchUndoLog.setXid(xid);
    branchUndoLog.setBranchId(branchId);
    branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
    UndoLogParser parser = UndoLogParserFactory.getInstance();
    byte[] undoLogContent = parser.encode(branchUndoLog);
		.........
    insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
}
protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
                                       Connection conn) throws SQLException {
    insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);
}
private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
                           State state, Connection conn) throws SQLException {
    try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
        pst.setLong(1, branchId);
        pst.setString(2, xid);
        pst.setString(3, rollbackCtx);
        pst.setBytes(4, undoLogContent);
        pst.setInt(5, state.getValue());
        pst.executeUpdate();
    } catch (Exception e) {
        if (!(e instanceof SQLException)) {
            e = new SQLException(e);
        }
        throw (SQLException) e;
    }
}



3、通过Connection正式进行本地事务提交



4、作为RM向TC进行本地事务完成状态的提交

private void report(boolean commitDone) throws SQLException {
    if (context.getBranchId() == null) {
        return;
    }
    int retry = REPORT_RETRY_COUNT;
    while (retry > 0) {
        try {
            DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
            return;
        } catch (Throwable ex) {
            retry--;
            if (retry == 0) {
                throw new SQLException("Failed to report branch status " + commitDone, ex);
            }
        }
    }
}

​ 可以看到,我们如果完成的话,就报告为

BranchStatus.PhaseOne_Done

,也就是本地分支事务完成提交,如果失败就是

BranchStatus.PhaseOne_Failed



4、本地分支事务的提交或回滚

​ 如果全局事务的各个分支正常完成,然后

TM

就会向

TC

进行全局事务的提交,然后

TC

就通过

xid

获取当前全局事务下面的各个分支,让它们进行提交,回滚从流程上来说是类似的,所以我们本次就梳理

commit

的流程。



1、提交

​ seata在调用是使用的

nio

,我们直接进入到具体的处理。

在这里插入图片描述

​ 这里是我们

RM

被调用提交:

在这里插入图片描述

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    return asyncWorker.branchCommit(xid, branchId, resourceId);
}

​ 可以看到其在

ResourceManager

中是异步处理的。

public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
    Phase2Context context = new Phase2Context(xid, branchId, resourceId);
    addToCommitQueue(context);
    return BranchStatus.PhaseTwo_Committed;
}

​ 添加到异步提交队列后就直接返回

BranchStatus.PhaseTwo_Committed

,也就是二阶段提交完成。

​ 那关于第二阶段完成的具体处理是什么呢?我们知道具体数据库的数据其实在第一阶段以及做了修改(并做了undo备份),所以二阶段的处理,不用处理数据部分了,只要将次

xid

对应的

undo

记录删除就可以了。

private void doBranchCommit() {
  	............
    groupedContexts.forEach(this::dealWithGroupedContexts);
}
private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
   	...........
    Connection conn;
    try {
        conn = dataSourceProxy.getPlainConnection();
    } catch (SQLException sqle) {
        return;
    }
    UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
	..........
    splitByLimit.forEach(partition -> deleteUndoLog(conn, undoLogManager, partition));
}
private void deleteUndoLog(Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {
    Set<String> xids = new LinkedHashSet<>(contexts.size());
 		.............
    try {
        undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
        if (!conn.getAutoCommit()) {
            conn.commit();
        }
    } catch (SQLException e) {
        LOGGER.error("Failed to batch delete undo log", e);
        try {
            conn.rollback();
        } catch (SQLException rollbackEx) {
            LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx);
        }
    } finally {
        try {
            conn.close();
        } catch (SQLException closeEx) {
            LOGGER.error("Failed to close JDBC resource after deleting undo log", closeEx);
        }
    }
}



2、回滚

​ 关于回滚,其主要就是根据

undo

查询恢复数据的提交

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    return asyncWorker.branchCommit(xid, branchId, resourceId);
}

@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException();
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        StackTraceLogger.info(LOGGER, te,
            "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
            new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;

}

​ 我们可以看到这个,就是恢复到前面的数据,具体里面的处理我们就不梳理了。

UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);



版权声明:本文为qq_25179481原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。