之前已经写过关于Seata的几种模式的原理以及Seata的一个AT模式的demo
今天结合源码讲一下seata的工作流程
配置文件
之前也说了seata的配置中心是怎么实现的https://blog.csdn.net/weixin_45661382/article/details/105539999
这里说一下配置的内容,已默认的file为例:
在file.conf
中有3部分配置内容:
1.transport
transport部分的配置是关于Netty的配置,主要体现在io.seata.core.rpc.netty
包下的NettyBaseConfig、NettyServerConfig、NettyClientConfig
,client与server的通信使用的是Netty
2.service
仅针对client有效
# service configuration, only used in client side
service {
#transaction service group mapping
#my_test_tx_group--->自定义分布式事务组名称 default--->对应registry.conf注册中心定义的名称,如eureka里的application
vgroupMapping.my_test_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
#db模式改配置无效
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
#是否启用seata的分布式事务
disableGlobalTransaction = false
}
io.seata.spring.annotation.GlobalTransactionScanner
3.client
仅针对client有效
#client transaction configuration, only used in client side
client {
rm {
#RM接收TC的commit通知后缓冲上限
asyncCommitBufferLimit = 10000
lock {
#校验或占用全局锁重试间隔 默认10,单位毫秒
retryInterval = 10
#校验或占用全局锁重试次数 默认30
retryTimes = 30
#分支事务与其它全局回滚事务冲突时锁策略 默认true,优先释放本地锁让回滚成功
retryPolicyBranchRollbackOnConflict = true
}
#一阶段结果上报TC重试次数 默认5次
reportRetryCount = 5
#自动刷新缓存中的表结构 默认false
tableMetaCheckEnable = false
#是否上报一阶段成功 true、false,从1.1.0版本开始,默认false.true用于保持分支事务生命周期记录完整,false可提高不少性能
reportSuccessEnable = false
sqlParserType = druid
}
tm {
#一阶段全局提交结果上报TC重试次数 默认1次,建议大于1
commitRetryCount = 5
#一阶段全局回滚结果上报TC重试次数 默认1次,建议大于1
rollbackRetryCount = 5
}
undo {
#二阶段回滚镜像校验 默认true开启,false关闭
dataValidation = true
#undo序列化方式 默认jackson
logSerialization = "jackson"
#自定义undo表名 默认undo_log
logTable = "undo_log"
}
log {
#日志异常输出概率 默认100,目前用于undo回滚失败时异常堆栈输出,百分之一的概率输出,回滚失败基本是脏数据,无需输出堆栈占用硬盘空间
exceptionRate = 100
}
}
Seata加载入口
seata的加载入口位于io.seata.spring.boot.autoconfigure.SeataAutoConfiguration:
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
@Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
@ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
public SpringApplicationContextProvider springApplicationContextProvider() {
return new SpringApplicationContextProvider();
}
@Bean(BEAN_NAME_FAILURE_HANDLER)
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());
}
}
这里有一个配置项SeataProperties,用于配置事务分组名称,即读取如下配置:
spring.cloud.alibaba.fescar.tx-service-group=my_test_tx_group
io.seata.spring.boot.autoconfigure.properties.SeataProperties.getTxServiceGroup:
public String getTxServiceGroup() {
if (null == txServiceGroup) {
txServiceGroup = springCloudAlibabaConfiguration.getTxServiceGroup();
}
return txServiceGroup;
}
io.seata.spring.boot.autoconfigure.properties.SpringCloudAlibabaConfiguration.getTxServiceGroup:
public String getTxServiceGroup() {
if (null == txServiceGroup) {
String applicationId = getApplicationId();
if (null == applicationId) {
LOGGER.warn("{} is null, please set its value", SPRING_APPLICATION_NAME_KEY);
}
txServiceGroup = applicationId + DEFAULT_SPRING_CLOUD_SERVICE_GROUP_POSTFIX;
}
return txServiceGroup;
}
如果没有配置,则使用spring.application.name+ -seata-service-group生成一个名称,所以如果不配置spring.application.name启动会报错
有了applicationId, txServiceGroup之后则创建一个io.seata.spring.annotation.GlobalTransactionScanner对象,主要看它的initClient()
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
TMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
可以看到初始化了TMClient
和RMClient
,所以对于一个服务既可以是TM角色也可以是RM角色,至于什么时候是TM或者RM则要看在一次全局事务中@GlobalTransactional注解标注在哪。
Client创建的结果是与TC的一个Netty连接,所以在启动日志中可以看到两个Netty Channel,其中也标明了transactionRole分别为TMROLE
和RMROLE
2020-04-29 10:26:17.478 INFO 17072 --- [imeoutChecker_1] i.s.c.r.netty.NettyClientChannelManager : will connect to 192.168.1.151:18091
2020-04-29 10:26:17.482 INFO 17072 --- [imeoutChecker_1] i.s.core.rpc.netty.NettyPoolableFactory : NettyPool create channel to transactionRole:TMROLE,address:192.168.1.151:18091,msg:< RegisterTMRequest{applicationId='account-server', transactionServiceGroup='ydc_tx_group'} >
2020-04-29 10:26:17.534 INFO 17072 --- [imeoutChecker_2] i.s.c.r.netty.NettyClientChannelManager : will connect to 192.168.1.151:18091
2020-04-29 10:26:17.534 INFO 17072 --- [imeoutChecker_2] io.seata.core.rpc.netty.RmRpcClient : RM will register :jdbc:mysql://127.0.0.1:3306/seata-account
2020-04-29 10:26:17.535 INFO 17072 --- [imeoutChecker_2] i.s.core.rpc.netty.NettyPoolableFactory : NettyPool create channel to transactionRole:RMROLE,address:192.168.1.151:18091,msg:< RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata-account', applicationId='account-server', transactionServiceGroup='ydc_tx_group'} >
2020-04-29 10:26:17.904 WARN 17072 --- [lector_RMROLE_1] i.s.common.loader.EnhancedServiceLoader : load [io.seata.serializer.hessian.HessianSerializer] class fail. com/caucho/hessian/io/AbstractHessianOutput
2020-04-29 10:26:17.905 INFO 17072 --- [lector_RMROLE_1] i.s.common.loader.EnhancedServiceLoader : load Serializer[SEATA] extension by class[io.seata.serializer.seata.SeataSerializer]
2020-04-29 10:26:18.030 INFO 17072 --- [imeoutChecker_2] io.seata.core.rpc.netty.RmRpcClient : register RM success. server version:1.1.0,channel:[id: 0x47882342, L:/192.168.1.192:49367 - R:/192.168.1.151:18091]
2020-04-29 10:26:18.040 INFO 17072 --- [imeoutChecker_1] i.s.core.rpc.netty.NettyPoolableFactory : register success, cost 195 ms, version:1.1.0,role:TMROLE,channel:[id: 0x46ae7e39, L:/192.168.1.192:49368 - R:/192.168.1.151:18091]
2020-04-29 10:26:18.040 INFO 17072 --- [imeoutChecker_2] i.s.core.rpc.netty.NettyPoolableFactory : register success, cost 205 ms, version:1.1.0,role:RMROLE,channel:[id: 0x47882342, L:/192.168.1.192:49367 - R:/192.168.1.151:18091]
TM处理流程
TM的一个作用就是开启全局事务,实际应用时在需要开启事务的方法上加注解@GlobalTransactional
,现在来看下它的作用:
这里有一个拦截器,io.seata.spring.annotation.GlobalTransactionalInterceptor:
//部分代码
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();
private final FailureHandler failureHandler;
private volatile boolean disable;
/**
* Instantiates a new Global transactional interceptor.
*
* @param failureHandler the failure handler
*/
public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
DEFAULT_DISABLE_GLOBAL_TRANSACTION);
}
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())
: null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (!disable && globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (!disable && globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
}
如果启用seata的分布式事务且有注解GlobalTransactional执行 handleGlobalTransaction():
io.seata.spring.annotation.GlobalTransactionalInterceptor.handleGlobalTransaction()
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
@Override
public TransactionInfo getTransactionInfo() {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getCause());
throw e.getCause();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
}
}
TransactionalTemplate
定义了TM对全局事务处理的标准步骤:
io.seata.tm.api.TransactionalTemplate.execute()
public Object execute(TransactionalExecutor business) throws Throwable {
// 1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.2 Handle the Transaction propatation and the branchType
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
suspendedResourcesHolder = tx.suspend(true);
return business.execute();
case REQUIRES_NEW:
suspendedResourcesHolder = tx.suspend(true);
break;
case SUPPORTS:
if (!existingTransaction()) {
return business.execute();
}
break;
case REQUIRED:
break;
case NEVER:
if (existingTransaction()) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
,RootContext.getXID()));
} else {
return business.execute();
}
case MANDATORY:
if (!existingTransaction()) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
try {
// 2. begin transaction
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
} finally {
tx.resume(suspendedResourcesHolder);
}
}
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
这一步确定了全局事务的角色:
//io.seata.tm.api.GlobalTransactionContext.getCurrentOrCreate()
public static GlobalTransaction getCurrentOrCreate() {
GlobalTransaction tx = getCurrent();
if (tx == null) {
return createNew();
}
return tx;
}
//io.seata.tm.api.GlobalTransactionContext.getCurrent()
private static GlobalTransaction getCurrent() {
String xid = RootContext.getXID();
if (xid == null) {
return null;
}
return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}
//io.seata.tm.api.GlobalTransactionContext.createNew()
private static GlobalTransaction createNew() {
return new DefaultGlobalTransaction();
}
//io.seata.tm.api.DefaultGlobalTransaction.DefaultGlobalTransaction()
DefaultGlobalTransaction() {
this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}
根据当前上下文是否已存在XID来判断,没有XID的就是Launcher(发起者),已经存在XID的就是Participant(参与者)
开启全局事务:
io.seata.tm.api.TransactionalTemplate.beginTransaction()
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
io.seata.tm.api.DefaultGlobalTransaction.begin()
@Override
public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
if (RootContext.getXID() != null) {
throw new IllegalStateException();
}
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
如果是参与者直接return,如果是发起者,执行具体开启事务的方法,获取TC返回的XID:
xid = transactionManager.begin(null, null, name, timeout);
io.seata.tm.DefaultTransactionManager负责TM与TC通讯,发送begin、commit、rollback指令
public class DefaultTransactionManager implements TransactionManager {
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
return response.getGlobalStatus();
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
return response.getGlobalStatus();
}
@Override
public GlobalStatus getStatus(String xid) throws TransactionException {
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setXid(xid);
GlobalStatusResponse response = (GlobalStatusResponse)syncCall(queryGlobalStatus);
return response.getGlobalStatus();
}
@Override
public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {
GlobalReportRequest globalReport = new GlobalReportRequest();
globalReport.setXid(xid);
globalReport.setGlobalStatus(globalStatus);
GlobalReportResponse response = (GlobalReportResponse) syncCall(globalReport);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
}
至此拿到TC返回的XID一个全局事务就开启了,全局事务创建后,就开始执行business.execute(),即我们的业务代码,进入RM处理流程
RM处理流程
RM是如何加入到全局事务中的呢?答案就是之前说过的seata数据源代理,通过DataSourceProxy才能在业务代码的事务提交时,seata通过这个切入点,来给TC发送RM的处理结果
由于业务代码本身的事务提交被ConnectionProxy代理,所以在提交本地事务时,实际执行的是ConnectionProxy的commit方法
//部分代码
public class ConnectionProxy extends AbstractConnectionProxy {
@Override
public void commit() throws SQLException {
try {
LOCK_RETRY_POLICY.execute(() -> {
doCommit();
return null;
});
} catch (SQLException e) {
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
private void doCommit() throws SQLException {
//如果当前是全局事务,则执行全局事务的提交
//判断是不是全局事务,就是看当前上下文是否存在XID
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
private void processLocalCommitWithGlobalLocks() throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}
private void processGlobalTransactionCommit() throws SQLException {
try {
//首先是向TC注册RM,拿到TC分配的branchId
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
//写入undolog
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
//提交本地事务,可以看到写入undolog和业务数据是在同一个本地事务中
targetConnection.commit();
} catch (Throwable ex) {
//向TC发送RM的事务处理失败的通知
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
//向TC发送rm的事务处理成功的通知
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
//注册RM,构建request通过netty向TC发送指令
//将返回的branchId存在上下文中
private void register() throws TransactionException {
if (!context.hasUndoLog() || context.getLockKeysBuffer().isEmpty()) {
return;
}
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
context.setBranchId(branchId);
}
}
具体步骤:
1.获取business-service传来的XID
2.绑定XID到当前上下文中
3.执行业务逻辑sql
4.向TC创建本次RM的Netty连接
5.向TC发送分支事务的相关信息
6.获得TC返回的branchId
7.记录Undo Log数据
8.向TC发送本次事务PhaseOne阶段的处理结果
9.从当前上下文中解绑XID
其中第1步和第9步在seata并没有实现,需要自己实现或者引入spring-cloud-alibaba-seata
的依赖,spring-cloud-alibaba-seata
中的com.alibaba.cloud.seata.web.SeataHandlerInterceptor完成了bind和unbind XID到上下文中。
public class SeataHandlerInterceptor implements HandlerInterceptor {
private static final Logger log = LoggerFactory.getLogger(SeataHandlerInterceptor.class);
public SeataHandlerInterceptor() {
}
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String xid = RootContext.getXID();
String rpcXid = request.getHeader("TX_XID");
if(log.isDebugEnabled()) {
log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
}
if(xid == null && rpcXid != null) {
RootContext.bind(rpcXid);
if(log.isDebugEnabled()) {
log.debug("bind {} to RootContext", rpcXid);
}
}
return true;
}
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception e) {
String rpcXid = request.getHeader("TX_XID");
if(!StringUtils.isEmpty(rpcXid)) {
String unbindXid = RootContext.unbind();
if(log.isDebugEnabled()) {
log.debug("unbind {} from RootContext", unbindXid);
}
if(!rpcXid.equalsIgnoreCase(unbindXid)) {
log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
if(unbindXid != null) {
RootContext.bind(unbindXid);
log.warn("bind {} back to RootContext", unbindXid);
}
}
}
}
}
到这里RM完成了一阶段的工作,接着看二阶段的处理逻辑。
事务提交
在RM启动时创建了与TC通讯的Netty连接,TC在获取各RM的汇报结果后,就会给各RM发送commit或rollback的指令
具体看下commit的过程:
io.seata.rm.AbstractRMHandler.doBranchCommit():
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
}
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch commit result: " + status);
}
}
获取request里的xid 、branchId 、resourceId、applicationData 、branchType,这里的branchType是一个枚举类型:
public enum BranchType {
/**
* The At.
*/
// AT Branch
AT,
/**
* The TCC.
*/
TCC,
/**
* The SAGA.
*/
SAGA,
/**
* The XA.
*/
XA;
/**
* Get branch type.
*
* @param ordinal the ordinal
* @return the branch type
*/
public static BranchType get(byte ordinal) {
return get((int)ordinal);
}
/**
* Get branch type.
*
* @param ordinal the ordinal
* @return the branch type
*/
public static BranchType get(int ordinal) {
for (BranchType branchType : BranchType.values()) {
if (branchType.ordinal() == ordinal) {
return branchType;
}
}
throw new IllegalArgumentException("Unknown BranchType[" + ordinal + "]");
}
}
然后执行branchCommit,将需要提交的XID加入list:
io.seata.rm.datasource.AsyncWorker.branchCommit():
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
}
return BranchStatus.PhaseTwo_Committed;
}
因为一阶段本地事务已经提交了,如果是全局提交只需要异步删除对应的undo_log记录即可,所以有如下操作:
io.seata.rm.datasource.AsyncWorker:
//通过一个定时任务消费list中的待提交XID
public synchronized void init() {
LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
timerExecutor.scheduleAtFixedRate(() -> {
try {
doBranchCommits();
} catch (Throwable e) {
LOGGER.info("Failed at async committing ... {}", e.getMessage());
}
}, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}
private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.isEmpty()) {
return;
}
Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
//一次定时任务取出ASYNC_COMMIT_BUFFER中的所有待办数据
//以resourceId作为key分组待办数据,resourceId就是一个数据库的连接url
//在前面的日志中可以看到,目的是为了覆盖应用的多数据源问题
while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
List<Phase2Context> contextsGroupedByResourceId = mappedContexts.computeIfAbsent(commitContext.resourceId, k -> new ArrayList<>());
contextsGroupedByResourceId.add(commitContext);
}
for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
Connection conn = null;
DataSourceProxy dataSourceProxy;
try {
try {
DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()
.getResourceManager(BranchType.AT);
dataSourceProxy = resourceManager.get(entry.getKey());
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
}
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
continue;
}
List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
for (Phase2Context commitContext : contextsGroupedByResourceId) {
xids.add(commitContext.xid);
branchIds.add(commitContext.branchId);
int maxSize = Math.max(xids.size(), branchIds.size());
if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
try {
//删除相应的undo_log记录
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
xids, branchIds, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
}
xids.clear();
branchIds.clear();
}
}
if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
return;
}
try {
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
branchIds, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
}
if (!conn.getAutoCommit()) {
conn.commit();
}
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);
}
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
}
}
}
事务回滚
对于rollback场景的触发有两种情况,
1.分支事务处理异常,即调用io.seata.rm.datasource.ConnectionProxy的report(false)的地方:
private void report(boolean commitDone) throws SQLException {
if (context.getBranchId() == null) {
return;
}
int retry = REPORT_RETRY_COUNT;
while (retry > 0) {
try {
DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
return;
} catch (Throwable ex) {
LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
+ commitDone + "] Retry Countdown: " + retry);
retry--;
if (retry == 0) {
throw new SQLException("Failed to report branch status " + commitDone, ex);
}
}
}
}
有两个地方:
private void processGlobalTransactionCommit() throws SQLException {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
@Override
public void rollback() throws SQLException {
targetConnection.rollback();
if (context.inGlobalTransaction() && context.isBranchRegistered()) {
report(false);
}
context.reset();
}
2.TM捕获到下游系统上抛的异常,即发起全局事务标有@GlobalTransactional注解的方法捕获到的异常。在前面TransactionalTemplate类的execute模版方法中,对business.execute()的调用进行了catch,catch后会调用completeTransactionAfterThrowing(txInfo, tx, ex),由TM通知TC对应XID需要回滚事务
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException {
//roll back
if (txInfo != null && txInfo.rollbackOn(ex)) {
try {
rollbackTransaction(tx, ex);
} catch (TransactionException txe) {
// Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
} else {
// not roll back on this exception, so commit
commitTransaction(tx);
}
}
private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, ex);
}
@Override
public void rollback() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
//只有Launcher可以发起这个回滚
// Participant has no responsibility of rollback
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
int retry = ROLLBACK_RETRY_COUNT;
try {
while (retry > 0) {
try {
status = transactionManager.rollback(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global rollback", ex);
}
}
}
} finally {
if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
suspend(true);
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] rollback status: {}", xid, status);
}
}
TC汇总后向参与者发送rollback指令,RM在AbstractRMHandler类的doBranchRollback方法中接收这个rollback的通知
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
}
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacked result: " + status);
}
}
然后执行io.seata.rm.datasource.DataSourceManager的branchRollback方法
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
StackTraceLogger.info(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
最终执行io.seata.rm.datasource.undo.AbstractUndoLogManager的undo
undo的具体流程:
1.根据xid和branchId查找PhaseOne阶段提交的undolog
2.如果找到了就根据undolog中记录的数据生成回放sql并执行,即还原PhaseOne阶段修改的数据
3.第2步处理完后,删除该条undolog数据
4.如果第1步没有找到对应的undolog,就插入一条状态为GlobalFinished的undolog.
出现没找到的原因可能是PhaseOne阶段的本地事务异常了,导致没有正常写入。因为xid和branchId是唯一索引,所以第4步的插入,可以防止PhaseOne阶段后续又写入成功,那么PhaseOne阶段就会异常,这样业务数据也是没有提交成功的,数据最终是回滚了的效果
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
conn = dataSourceProxy.getPlainConnection();
// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// Find UNDO LOG
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
// It is possible that the server repeatedly sends a rollback request to roll back
// the same branch transaction to multiple processes,
// ensuring that only the undo_log in the normal state is processed.
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
// If undo_log exists, it means that the branch transaction has completed the first phase,
// we can directly roll back and clean the undo_log
// Otherwise, it indicates that there is an exception in the branch transaction,
// causing undo_log not to be written to the database.
// For example, the business processing timeout, the global transaction is the initiator rolls back.
// To ensure data consistency, we can insert an undo_log with GlobalFinished state
// to prevent the local transaction of the first phase of other programs from being correctly submitted.
// See https://github.com/seata/seata/issues/489
if (exists) {
deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
State.GlobalFinished.name());
}
} else {
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
State.GlobalFinished.name());
}
}
return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
}
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
}
}
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
branchId, e.getMessage()), e);
} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
}
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
}
}
}
}
参考链接:https://blog.csdn.net/f4761/article/details/89077400