事务是基于数据库本身功能实现控制的,因此Spring并不直接管理事务,而是提供了多种事务管理器,他们将事务管理的职责委托给Hibernate或者JTA等持久化机制所提供的相关平台框架的事务来实现。
相比于传统事务处理的模板式重复的编码过程,Spring提供了灵活的声明式事务使用形式,可以通过极少的代码或者注解形式,通过AOP代理来实现具体的事务操作。
Java编程式事务
Java 中数据源的加载采用的是一种SPI机制,通过服务的提供者发现机制加载对应的实例对象,以MySQL的使用为例,传统的JDBC连接,都需要经历如下代码所示步骤。
public class JdbcTransaction {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
// 导入驱动jar包,注册驱动
Class.forName("com.mysql.jdbc.Driver");
// 获取数据库的连接对象
Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/t_db", "root", "root");
// 定义sql语句
String sql = "select * from u";
// 获取执行sql语句的对象
Statement stat = con.createStatement();
// 执行sql并接收返回结果
ResultSet rs = stat.executeQuery(sql);
// 处理结果
while (rs.next()){
String string = rs.getString(1);
System.out.println("column 1:"+string);
}
con.commit();
// 连接con对象可以设置保存点,也可以回退到具体的保存点
con.setSavepoint();
Savepoint savepoint = con.setSavepoint();
con.rollback(savepoint);
// 释放资源
stat.close();
con.close();
}
}
Spring事务
Spring中对于事务进行了深层次的定义以及封装,接下来围绕Spring中事务的使用形式,核心类及接口,以及传播机制等几个方面展开。
代码示例
Spring中对于的事务的使用进行了简化,可以使用注解,也可以使用简单的代码嵌套进行事务的应用,如下所示。
- 使用@Transaction注解
- 使用PlatformTransactionManager、TransactionDefinition、TransactionStatus对象
- 使用TransactionTemplate对象
使用案例如下,三种方式并无明显区别,后两者可以获取事务执行过程种具体的事务对象,可以进行更加准备的操作和状态的感知,比如回退到某个保存点,使用注解则完全对于事务的管理黑盒化,且存在事务失效的场景。
package com.starsray.learn.transaction.spring;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Resource;
/**
* spring事务 使用示例
*
* @author starsray
* @date 2023/03/06
*/
@Service
public class SpringTransaction {
@Resource
public TransactionTemplate transactionTemplate;
@Resource
public PlatformTransactionManager transactionManager;
/**
* 通过注解实现事务管理
*/
@Transactional(rollbackFor = Exception.class, isolation = Isolation.DEFAULT, propagation = Propagation.REQUIRED)
public void annotationMethod() {
// 无需关注事务操作的具体过程,Spring通过AOP代理拦截@Transaction注解修饰的方法,并根据其中定义的参数,进行事务处理逻辑的生成
}
/**
* 通过模板对象使用事务
*/
public void templateTransaction() {
// 自动注入IoC容器中初始化的事务模板对象,可以通过Bean定义,也可以从XML加载。
transactionTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
// 模板方法中封装了事务的开启、提交以及异常回滚的操作
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
/**
* 重写事务方法内容
*
* @param status 状态
*/
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
}
});
}
/**
* 通过事务管理器使用事务
*/
public void managerTransaction() {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
// Spring根据事务定义数据,判断如何开始事务。
TransactionStatus transaction = transactionManager.getTransaction(def);
try {
// do something
// 提交事务
transactionManager.commit(transaction);
} catch (Exception e) {
e.printStackTrace();
// 回滚事务
transactionManager.rollback(transaction);
}
}
}
在使用Spring中的注解
@Transaction
进行事务管理时需要注意事务失效的场景,常见的场景如下:
- 在事务方法中调用非事务方法,如A方法被@Transaction修饰,而B未被@Transaction修饰,此时A调用B,B中包含了数据库操作就可能导致事务不生效。
- 指定了错误的事务传播机制,Spring中定义了七种事务传播机制,其实也就是定义了PlatformTransactionManager如何对事务的传播行为进行处理。
- 使用private修饰方法,@Transaction是根据AOP进行拦截,生成模板代码,使用私有修饰时会导致无法被代理从而导致事务失效。
- 指定了错误的rollbackFor、和noRollbackFor也会导致事务无法进行正常处理。
- 使用try catch捕获代码快,在出现异常时为进行回滚操作的。
核心类及接口
前面提到了Spring中的声明式事务核心依赖于AOP来实现,以及常用的几种编码形式,其具体的实现方式到底是什么样的,这里以@Transaction注解为切入点,进行事务实现原理的简析。
在实际应用中@Transaction也是使用最为频繁的一种方式,先查看一下@Transaction的源码:
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Transactional {
// 同transactionManager属性一样,指定事务管理器
@AliasFor("transactionManager")
String value() default "";
@AliasFor("value")
String transactionManager() default "";
// 事务传播机制
Propagation propagation() default Propagation.REQUIRED;
// 事务隔离级别
Isolation isolation() default Isolation.DEFAULT;
// 超时时间,如果不支持超时,则不使用超时
int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;
// 只读标志
boolean readOnly() default false;
// 异常回滚类型
Class<? extends Throwable>[] rollbackFor() default {};
String[] rollbackForClassName() default {};
// 异常不回滚类型
Class<? extends Throwable>[] noRollbackFor() default {};
String[] noRollbackForClassName() default {};
}
在
@Transaction
中定义了一些常见的关于事务操作的参数。Java中注解的用于相当于用一些元数据进行标注,同样使用该主机意味着被标注的方法需要进行事务管理,使用在类上意味着所有方法都要进行事务管理。
既然是一种标记那肯定就有对@Transaction中定义信息进行解析的地方。Spring中的AOP的核心是基于拦截器链,进行实现的,@Transaction的使用业主要聚焦于TransactionInterceptor类,查看一下该类的继承图
TransactionInterceptor类继承自TransactionAspectSupport类,并且实现了MethodInterceptor接口,查看源码:
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
public TransactionInterceptor() {
}
public TransactionInterceptor(PlatformTransactionManager ptm, Properties attributes) {
setTransactionManager(ptm);
setTransactionAttributes(attributes);
}
public TransactionInterceptor(PlatformTransactionManager ptm, TransactionAttributeSource tas) {
setTransactionManager(ptm);
setTransactionAttributeSource(tas);
}
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// 整个事务处理的方法入口就在这个方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}
需要注意的是MethodInterceptor接口是AOP中定义的接口,TransactionAspectSupport中是Spring框架关于事务处理的具体实现,TransactionInterceptor中源码部分内容非常简单,主要利用了面向对象的思想实现耦合,连接Spring以及AOP。
在invokeWithinTransaction方法中,整个过程参考JDBC同样可以分为三个阶段,开启事务、执行语句、提交\回滚事务。
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取事务的定义信息,例如XML、注解等定义的事务管理器、隔离级别等事务的基本信息
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
// 这里可以不做特别关注,使用到Reactive 类型的事务才会走这里
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}
// 通常Web程序从这里开始
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 标准的事务声明,从createTransactionIfNecessary方法开始,对应JDBC中的Begin/Start语句,也可以理解为开启事务,不过Spring中做了更多的判断适配。
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 执行被增强方法,也就是目标方法的具体实现
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 如果代理对象执行过程中出现了异常如何进行下一步,在completeTransactionAfterThrowing方法中体现
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 一个事务执行完成后,会清理线程中对应的事务信息
cleanupTransactionInfo(txInfo);
}
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
// 注意前面if中的条件判断,Spring中的事务通常都被DataSourceTransactionManager进行管理,进入else条件说明是一个CallbackPreferringPlatformTransactionManager类型的事务,也就是通常所说的编程式事务,例如使用TransactionTemplate或者PlatformTransactionManager进行事务控制的代码会在这里执行。
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
// 进行事务开始前的准备
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
// 执行被增强的方法
Object retVal = invocation.proceedWithInvocation();
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
// 此处只清理线程绑定的事务信息,具体的commit、rollback由用户控制
cleanupTransactionInfo(txInfo);
}
});
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
// 捕获相应的异常
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
开启事务
createTransactionIfNecessary方法,主要的作用是判断由DataSourceTransactionManager进行管理的事务,如何创建事务。
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
// 声明事务的运行时信息,将被存储在status对象中
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 根据事务配置属性,创建事务对象信息
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 返回创建事务的准备结果,并通过该方法中的bindToThread方法调用,将事务信息TransactionInfo对象绑定到本地线程
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
AbstractPlatformTransactionManager类中主要封装了关于事务的基础操作,getTransaction方法主要是事务创建的完整细节,其参数为事务的定义信息,查看源码:
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
// 判断事务的定义信息,如果为null,则使用默认的事务定义
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// doGetTransaction为获取事务信息的具体执行方法,该方法返回了一个事务的包装对象
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 判断当前事务对象是否为存在状态,如果存在且激活直接返回
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
// 如果设置了超时时间校验时间
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
// 如果不存在事务,并且传播行为设置为PROPAGATION_MANDATORY则直接抛出异常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 如果传播行为为以上类型,挂起当前事务,suspend方法入参为null,会获取当前synchronizations中的事务进行全部挂起操作
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 并开启一个新的事务
doBegin(transaction, def);
// 重新准备事务同步器,本质是使用一个ThreadLocal进行各种信息的存储,相关的变量信息被定义在TransactionSynchronizationManager类中
prepareSynchronization(status, def);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// 处理其他传播机制类型的事务,如SUPPORT、NOT_SUPPORT,也就是一个非事务方法
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 返回非事务时的状态信息
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
关于getTransaction方法中的doGetTransaction方法,需要具体的实现,这里引入了Spring-JDBC,以JDBC为例,查看DataSourceTransactionManager类的doGetTransaction的具体实现。
@Override
protected Object doGetTransaction() {
// 声明一个新的DataSourceTransactionObject实例
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 设置保存点,并判断是否允许嵌套事务
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// TransactionSynchronizationManager前面提到了定义并且利用ThreadLocal存储了各种事务相关的状态信息,并和当前线程绑定
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// ConnectionHolder是一个JDBC连接对象的包装,包含了Connection对象,是否激活事务,是否支持保存点等操作
txObject.setConnectionHolder(conHolder, false);
// 返回DataSourceTransactionObject实例
return txObject;
}
Dat前面流程进行了一系列关于事务定义信息的判断,如果传播行为等,以及事务同步信息的缓存,通过ThreadLocal进行与操作线程绑定,实现变量传递,最终主备工作完成后,返回DataSourceTransactionObject实例对象。 开始进行事务真正的开始,查看DataSourceTransactionManager类的doBegin方法
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
// 该方法中入参transaction为doGetTransaction方法返回的DataSourceTransactionObject对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 判断是否存在ConnectionHolder对象,以及是否同步事务等信息
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 获取真实的JDBC连接对象
con = txObject.getConnectionHolder().getConnection();
// 开始准备事务连接,并获取当前事务的隔离级别记录为上一个事务隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
// 判断连接对象是否为自动提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 如果是自动提交,设置自动提交为false,方便使用Spring事务管理进行数据库的事务干预
con.setAutoCommit(false);
}
// 准备事务连接,如果事务为只读,设置为只读模式
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 绑定事务信息到当前线程
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
// 捕获异常,并且释放连接信息
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
如果使用连接池,会通过连接池代理进行Connection对象的代理操作,ProxyConnection类位于com.zaxxer.hikari.pool包。
public void setAutoCommit(boolean autoCommit) throws SQLException
{
delegate.setAutoCommit(autoCommit);
isAutoCommit = autoCommit;
dirtyBits |= DIRTY_BIT_AUTOCOMMIT;
}
通过连接池的代理,会将连接对象代理到具体的数据库驱动包,这里以MySQL为例,具体的实现 ConnectionImpl类位于com.mysql.cj.jdbc包。
public void setAutoCommit(final boolean autoCommitFlag) throws SQLException {
synchronized (getConnectionMutex()) {
checkClosed();
if (this.connectionLifecycleInterceptors != null) {
IterateBlock<ConnectionLifecycleInterceptor> iter = new IterateBlock<ConnectionLifecycleInterceptor>(
this.connectionLifecycleInterceptors.iterator()) {
@Override
void forEach(ConnectionLifecycleInterceptor each) throws SQLException {
if (!each.setAutoCommit(autoCommitFlag)) {
this.stopIterating = true;
}
}
};
iter.doForAll();
if (!iter.fullIteration()) {
return;
}
}
if (this.autoReconnectForPools.getValue()) {
this.autoReconnect.setValue(true);
}
try {
boolean needsSetOnServer = true;
if (this.useLocalSessionState.getValue() && this.session.getServerSession().isAutoCommit() == autoCommitFlag) {
needsSetOnServer = false;
} else if (!this.autoReconnect.getValue()) {
needsSetOnServer = getSession().isSetNeededForAutoCommitMode(autoCommitFlag);
}
// this internal value must be set first as failover depends on it being set to true to fail over (which is done by most app servers and
// connection pools at the end of a transaction), and the driver issues an implicit set based on this value when it (re)-connects to a
// server so the value holds across connections
this.session.getServerSession().setAutoCommit(autoCommitFlag);
// 判断是否需要使用本地事务,来执行是否开启自动提交
if (needsSetOnServer) {
this.session.execSQL(null, autoCommitFlag ? "SET autocommit=1" : "SET autocommit=0", -1, null, false, this.nullStatementResultSetFactory,
null, false);
}
} finally {
if (this.autoReconnectForPools.getValue()) {
this.autoReconnect.setValue(false);
}
}
return;
}
}
执行语句
再次回到invokeWithinTransaction方法中,关于具体的SQL执行,这里用到了一个环绕类型的AOP增强,具体的操作在proceedWithInvocation方法中,具体代码片段如下:
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 声明式事务,获取事务的定义信息
TransactionAttributeSource tas = getTransactionAttributeSource();
...
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 开启事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行被增强方法,需要被执行的SQL语句
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 异常回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清理事务信息
cleanupTransactionInfo(txInfo);
}
...
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
// 处理编程式事务
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
// 进行事务开始前的准备
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
// 执行被增强的方法
Object retVal = invocation.proceedWithInvocation();
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
// 异常处理
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
// 此处只清理线程绑定的事务信息,具体的commit、rollback由用户控制
cleanupTransactionInfo(txInfo);
}
});
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
// 捕获相应的异常
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
提交\回滚事务
当增强方法被执行完成后,继续回到TransactionAspectSupport中的invokeWithinTransaction方法,定位到commitTransactionAfterReturning这里,查看提交或者回滚相关的操作。
查看TransactionAspectSupport类中的commitTransactionAfterReturning方法源码:
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 进行事务的提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
接下来查看事务管理器AbstractPlatformTransactionManager类中的commit方法
public final void commit(TransactionStatus status) throws TransactionException {
// 判断是否已提交,抛出异常
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
// 获取事务状态信息
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 处理回滚
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
// 如果事务正常执行,进入到提交阶段
processCommit(defStatus);
}
接下来查看AbstractPlatformTransactionManager类中的processCommit方法,具体的提交实现。
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
// 定义事务是否为RollBack标记
boolean unexpectedRollback = false;
// 进行提交相关的准备
prepareForCommit(status);
// 提交前进行一些资源的释放,或者同步信息的释放,可以被覆盖操作
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 判断事务是否包含保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 释放保存点,清楚保存点信息
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 执行事务的提交
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
// 如果全局事务为Rollback,则抛出异常
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
// 捕获异常并进行异常处理
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
// 清理事务状态信息
cleanupAfterCompletion(status);
}
}
进一步查看提交的实现,DataSourceTransactionManager中的doCommit方法,通过DataSourceTransactionObject类获取连接的包装对象,并拿到Connection 真实对象信息,进行提交。
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
这里以MySQL为例,具体的提交又数据库驱动封装完成,查看com.mysql.cj.jdbc.ConnectionImpl中的commit方法,进行事务最终的提交。
public void commit() throws SQLException {
// 获取互斥锁
synchronized (getConnectionMutex()) {
// 校验连接关闭
checkClosed();
try {
if (this.connectionLifecycleInterceptors != null) {
IterateBlock<ConnectionLifecycleInterceptor> iter = new IterateBlock<ConnectionLifecycleInterceptor>(
this.connectionLifecycleInterceptors.iterator()) {
@Override
void forEach(ConnectionLifecycleInterceptor each) throws SQLException {
if (!each.commit()) {
this.stopIterating = true;
}
}
};
iter.doForAll();
if (!iter.fullIteration()) {
return;
}
}
if (this.session.getServerSession().isAutoCommit()) {
throw SQLError.createSQLException(Messages.getString("Connection.3"), getExceptionInterceptor());
}
if (this.useLocalTransactionState.getValue()) {
if (!this.session.getServerSession().inTransactionOnServer()) {
return; // effectively a no-op
}
}
// 执行具体的提交语句,进行事务提交
this.session.execSQL(null, "commit", -1, null, false, this.nullStatementResultSetFactory, null, false);
} catch (SQLException sqlException) {
if (MysqlErrorNumbers.SQL_STATE_COMMUNICATION_LINK_FAILURE.equals(sqlException.getSQLState())) {
throw SQLError.createSQLException(Messages.getString("Connection.4"), MysqlErrorNumbers.SQL_STATE_TRANSACTION_RESOLUTION_UNKNOWN,
getExceptionInterceptor());
}
throw sqlException;
} finally {
this.session.setNeedsPing(this.reconnectAtTxEnd.getValue());
}
}
return;
}
通过以上源码进行核心代码的分析,追踪了通过Spring进行事务管理时,关于声明式事务和编程式事务的处理过程,两种事务处理方式并没有明显的优劣之分,声明式事务屏蔽了处理细节,使用简单,使用者无需关心事务的开启提交过程,但是也正是由于其黑盒化
的封装,让使用者无法清楚的知道其执行流程,也无法对于事务的细节进行处理,如长事务过程中,无法对保存点进行操作,在某些场景下事务失效原因不能仔细排查,编程式事务最大的优势就在于对于复杂场景的把控,因此应根据具体情况具体分析。
在代码层面, Spring中事务相关代码位于spring-tx模块中,Spring提供了核心接口:
- TransactionDefinition: 定义了事务基本属性(包含事务隔离级别、传播行为、超时、只读、回滚规则)
- PlatformTransactionManager: 事务管理器,用于事务的提交、回滚等操作。
- TransactionStatus: 事务的运行时状态获取,贯穿于声明式事务的传递
其中还有一些核心类
- TransactionTemplate:事务操作的模板类
- TransactionAspectSupport:AOP事务操作的基本方法封装
- TransactionInterceptor:事务拦截器的入口,基于进行事务处理
- AbstractPlatformTransactionManager:Spring事务处理标准工作流的封装
- DataSourceTransactionManager:Spring中关于数据源操作的标准封装
- TransactionSynchronizationManager:事务执行过程,在本地线程中通过ThreadLocal进行的本地信息封装
关于Spring中事务处理的工作流程,可以参考AbstractPlatformTransactionManager中的定义:
- This base class provides the following workflow handling:
- determines if there is an existing transaction;
- applies the appropriate propagation behavior;
- suspends and resumes transactions if necessary;
- checks the rollback-only flag on commit;
- applies the appropriate modification on rollback (actual rollback or setting rollback-only);
- triggers registered synchronization callbacks (if transaction synchronization is active).
事务传播机制
在事务执行准备阶段进行了事务传播机制的判断,详见getTransaction方法。所谓的Spring事务传播机制主要是Spring中定义的对事务处理的几种方式。传统的事务可以分为扁平事务、链事务、嵌套事务,Spring中对于这些常见的类型进行了封装,衍生出了七种类型。简单理解就是两个被事务修饰或者不修饰的方法进行调用时,Spring的处理方式如何。
-
REQUIRED
Spring中默认配置,如果当前操作不存在事务,创建一个新事务。
@Transactional(propagation= Propagation.REQUIRED)
public void methodA(){
methodB();
// do something
}
@Transactional(propagation= Propagation.REQUIRED)
public void methodB(){
// do something
}
调用methdoA,如果methodB发生异常,触发事务回滚,也会methodA中的也会回滚。
-
SUPPORTS
如果当前存在事务,就加入该事务,如果当前不存在事务,就以非事务执行。
@Transactional(propagation= Propagation.REQUIRED)
public void methodA(){
methodB();
// do something
}
@Transactional(propagation= Propagation.SUPPORTS)
public void methodB(){
// do something
}
如果调用methodA,再调用methodB,MehtodB会加入到MethodA的开启的当前事务中。 如果直接调用methodB,当前没有事务,就以非事务执行。
-
MANDATORY
支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就抛出异常。
@Transactional(propagation= Propagation.REQUIRED)
public void methodA(){
methodB();
// do something
}
@Transactional(propagation= Propagation.MANDATORY)
public void methodB(){
// do something
}
如果调用methodA,再调用methodB,MehtodB会加入到MethodA的开启的当前事务中。 如果直接调用methodB,当前没有事务,就会抛出异常。
-
REQUIRES_NEW
对于当前操作创建一个新事务,如果当前操作已存在事务,则会挂起当前事务,等待新事务执行完成。
@Transactional(propagation= Propagation.REQUIRED)
public void methodA(){
// do something pre
methodB();
// do something post
}
@Transactional(propagation= Propagation.REQUIRES_NEW)
public void methodB(){
// do something
}
调用methodA,会先开启事务1,执行A的something pre的代码。再调用methodB,methdoB会开启一个事务2,再执行自身的代码。最后在执行methodA的something post。如果method发生异常回滚,只是methodB中的代码回滚,不影响methodA中的代码。如果methodA发生异常回滚,只回滚methodA中的代码,不影响methodB中的代码。 简言之,不会影响别人,也不会被别人影响。
-
NOT_SUPPORTED
以非事务方式执行,如果当前存在事务则挂起当前事务执行。
@Transactional(propagation= Propagation.REQUIRED)
public void methodA(){
methodB();
// do something
}
@Transactional(propagation= Propagation.NOT_SUPPORTED)
public void methodB(){
// do something
}
调用methodA,再调用methodB,methodA开启的事务会被挂起,即在methodB中不齐作用,相当于没有事务,methodB内部抛出异常不会回滚。methodA内的代码发生异常会回滚。 直接调用methodB,不会开启事务。
-
NEVER
以非事务性执行,如果存在事务,则引发异常。
@Transactional(propagation= Propagation.REQUIRED)
public void methodA(){
methodB();
// do something
}
@Transactional(propagation= Propagation.NEVER)
public void methodB(){
// do something
}
-
NESTED
如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则按REQUIRED属性执行
@Transactional(propagation= Propagation.REQUIRED)
public void methodA(){
// do something pre
methodB();
// do something post
}
@Transactional(propagation= Propagation.NESTED)
public void methodB(){
// do something
}
调用methodA,开启一个事务,执行something pre的代码,设置回滚点savepoint,再调用methodB的代码,如果methodB里抛出异常,此时回滚到之前的saveponint。再然后执行methodA里的something post的代码,最后提交或者回滚事务。 嵌套事务,外层的事务如果回滚,会导致内层的事务也回滚;但是内层的事务如果回滚,仅仅是回滚自己的代码,不影响外层的事务的代码。
总结
关于Spring中的事务处理,从事务的类型(声明式事务、编程式事务)、事务的传播机制、事务的执行流程进行源码分析,具体的细节需要自行观看。整个事务执行状态的信息基于ThreadLocal进行线程绑定,封装了对应数据库的驱动执行细节,简化了开发流程。