事务模块类结构
声明式事务
声明式事务处理大致分为以下几个步骤
1)从配置文件中读取事务配置,并将事务元数据转化为spring内部元数据对象。
涉及类:TransactionAttributeSourceAdvisor(此通知器类会将事务处理的属性信息抽象到TransactionAttribute对象中)
2)将事务处理对象与当前线程绑定,通过TransactionInfo和TransactionStatus这两个数据对象进行事务处理过程中相关执行场景的记录和传递。
3)PlatformTransactionManager对事务的具体实现。
- 声明式事务处理启动
声明式事务配置
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<bean id="toBeTestService" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
<property name="target">
<!--被代理目标类-->
<bean class="com.test.m.service.ToBeTestService"/>
</property>
<property name="transactionManager" ref="transactionManager"/>
<property name="transactionAttributes">
<props>
<prop key="insert*">PROPAGATION_REQUIRED</prop>
<prop key="update*">PROPAGATION_REQUIRED</prop>
<prop key="select*">PROPAGATION_REQUIRES_NEW</prop>
<prop key="*">PROPAGATION_REQUIRED,readOnly</prop>
</props>
</property>
</bean>
transactionAttributes中的属性为事务处理的拦截规则,由NameMatchTransactionAttributeSource实现,规则会被封装到一个map中
如:key为insert*,value为PROPAGATION_REQUIRED
TransactionProxyFactoryBean是个FactoryBean,是生成具有事务特性代理类的入口。
TransactionProxyFactoryBean继承关系
先大致看一下事务处理代理对象的建立流程
下面来具体分析
TransactionProxyFactoryBean实现了InitializingBean与FactoryBean,所以TransactionProxyFactoryBean的IoC容器初始化方法为afterPropertiesSet方法。
@Override
public void afterPropertiesSet() {
if (this.target == null) {
throw new IllegalArgumentException("Property 'target' is required");
}
if (this.target instanceof String) {
throw new IllegalArgumentException("'target' needs to be a bean reference, not a bean name as value");
}
if (this.proxyClassLoader == null) {
this.proxyClassLoader = ClassUtils.getDefaultClassLoader();
}
//创建事务代理对象
ProxyFactory proxyFactory = new ProxyFactory();
//设置自定义前置拦截器
if (this.preInterceptors != null) {
for (Object interceptor : this.preInterceptors) {
proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(interceptor));
}
}
//创建事务处理的AOP通知器
proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(createMainInterceptor()));
//设置自定义后置拦截器
if (this.postInterceptors != null) {
for (Object interceptor : this.postInterceptors) {
proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(interceptor));
}
}
proxyFactory.copyFrom(this);
//创建AOP的目标源
TargetSource targetSource = createTargetSource(this.target);
proxyFactory.setTargetSource(targetSource);
if (this.proxyInterfaces != null) {
proxyFactory.setInterfaces(this.proxyInterfaces);
}
else if (!isProxyTargetClass()) {
// Rely on AOP infrastructure to tell us what interfaces to proxy.
proxyFactory.setInterfaces(
ClassUtils.getAllInterfacesForClass(targetSource.getTargetClass(), this.proxyClassLoader));
}
postProcessProxyFactory(proxyFactory);
//创建事务代理对象
this.proxy = proxyFactory.getProxy(this.proxyClassLoader);
}
/**
* 创建事务处理的AOP通知器
*/
protected Object createMainInterceptor() {
this.transactionInterceptor.afterPropertiesSet();
if (this.pointcut != null) {
//使用默认通知器,并配置事务处理拦截器
return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor);
}
else {
//若没有配置切点,使用TransactionAttributeSourceAdvisor通知器,并配置事务处理拦截器
return new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
}
}
- 事务处理通知器初始化
这里通过TransactionAttributeSourceAdvisor通知器来进行分析
Advisor需要有两个元素,切点(Pointcut)和切面(Interceptor)这里的切点为TransactionAttributeSourceAdvisor类中声明的TransactionAttributeSourcePointcut,并通过TransactionAttributeSourcePointcut内部的matches方法来做目标方法是否拦截的规则匹配
@Override
public boolean matches(Method method, Class<?> targetClass) {
if (TransactionalProxy.class.isAssignableFrom(targetClass)) {
return false;
}
//从TransactionInterceptor中获取TransactionAttributeSource(声明中配置的拦截规则)对象,具体实现为
//NameMatchTransactionAttributeSource实现类
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}
具体的匹配规则实现(NameMatchTransactionAttributeSource.getTransactionAttribute)
/**
* 根据方法名进行正则匹配看是否有对应的事务属性对象(主要是一些配置的隔离级别,readonly等等),若呢个匹配
* 到具体的TransactionAttribute对象,说明TransactionInterceptor做好了对调用目标的方法添加事务处理
* 的准备
*/
@Override
public TransactionAttribute getTransactionAttribute(Method method, Class<?> targetClass) {
if (!ClassUtils.isUserLevelMethod(method)) {
return null;
}
// Look for direct name match.
String methodName = method.getName();
TransactionAttribute attr = this.nameMap.get(methodName);
if (attr == null) {
// Look for most specific name match.
String bestNameMatch = null;
for (String mappedName : this.nameMap.keySet()) {
if (isMatch(methodName, mappedName) &&
(bestNameMatch == null || bestNameMatch.length() <= mappedName.length())) {
attr = this.nameMap.get(mappedName);
bestNameMatch = mappedName;
}
}
}
return attr;
}
- 事务拦截器的具体实现
由于事务AOP是通过CglibAopProxy实现的,大概说明下调用流程
所以我们看TransactionInterceptor中的invoke方法
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
// 通过TransactionAttributeSource对象取得事务的属性配置
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
// 根据TransactionProxyFactoryBean的配置信息获得具体的事务处理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass);
// 不同类型的PlatformTransactionManager调用方式不同,CallbackPreferringPlatformTransactionManager需要回调函数来实现事务流程,而我们常用的DataSourceTransactionManager就不是CallbackPreferringPlatformTransactionManager
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务对象,其中保存了事务状态对象
// TransactionInfo是一个事务对象,对象中包含事务的所有属性,包括PlatformTransactionManager、TransactionAttribute、TransactionStatus
// ⚠️事务的管理都是通过TransactionInfo对象来完成,它封装了事务对象和事务处理的状态信息
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 进行拦截器和目标方法的调用
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
// 提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
return new ThrowableHolder(ex);
}
}
finally {
cleanupTransactionInfo(txInfo);
}
}
});
// Check result: It might indicate a Throwable to rethrow.
if (result instanceof ThrowableHolder) {
throw ((ThrowableHolder) result).getThrowable();
}
else {
return result;
}
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
}
}
流程图
1)读取事务方法的事务属性配置
2)获取PlatformTransactionManager事务处理器的具体实现
3)决定是否创建新的事务(后面会具体分析)
4)对拦截器和目标对象进行调用
5)根据执行进行commit或rollback
- 事务的创建
编程式创建事务
DataSourceTransactionManager manager = new DataSourceTransactionManager();
TransactionDefinition td = new DefaultTransactionDefinition();
TransactionStatus transaction = manager.getTransaction(td);
try{
//todo
}catch (Exception e){
manager.rollback(transaction);
}
manager.commit(transaction);
事务创建流程图
大致过程一句话:通过DataSourceTransactionManager创建一个TransactionStatus,并将TransactionStatus设置到当前的TransactionInfo中,同时将此TransactionInfo与当前线程绑定。
下面我们来分析DataSourceTransactionManager.getTransaction方法
@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
// 获取ConnectionHolder数据源连接对象
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
definition = new DefaultTransactionDefinition();
}
// 是否是嵌套事务,若是,则根据事务的传播属性来处理事务的产生
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// 检查事务属性中timeout的设置是否合理
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// 根据事务的传播属性进行相应的处理
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建TransactionStatus
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启事务(调用connect对象的setAutoCommit(false)和setTransactionActive(true))
doBegin(transaction, definition);
// 初始化当前线程的threadlocal对象
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
创建新事物比较好理解,根据事务属性的配置创建事务对象并保存到TransactionStatus中,并将事务属性信息与当前线程绑定。
对于嵌套式事务,需要关注下AbstractPlatformTransactionManager.handleExistingTransaction方法,会根据事务属性中配置的传播属性进行事务的处理。
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 事务传播属性为不容许存在事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 事务传播属性为不开启新事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 将当前事务挂起
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 这里transaction为null,newTransaction为false,说明事务方法不需要放在事务环境中执行,会将之前的事务信息保存在当前的TransactionStatus对象中
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 事务传播属性为开启新事物
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
// 嵌套事务的创建
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 判断当前事务与已有的事务属性是否一致,若不一致会抛出异常
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 第三个参数false代表当前事务方法没有使用新事务
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
- 事务的挂起
事务的挂起原理机制大概理解为
1)获取上一次事务的数据源对象封装并返回SuspendedResourcesHolder对象
2)调用doBegin方法创建新的数据源连接对象与新的事务信息对象
2)将SuspendedResourcesHolder设置到新的TransactionStatus中
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 获取上一次事务的数据源对象
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
// 封装并返回上一次事务的事务对象
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
catch (Error err) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw err;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
// 获取上一次事务的数据源连接对象,并将其从当前ThreadLocal中移除
ConnectionHolder conHolder = (ConnectionHolder)
TransactionSynchronizationManager.unbindResource(this.dataSource);
return conHolder;
}
- 事务的提交
直接看事务提交的流程AbstractPlatformTransactionManager.processCommit
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
// 事务提交的准备工作,如果集成了mybatis,事务的真正提交在此方法中完成,spring的事务对象仅仅做事务状态的记录
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
// 嵌套事务的处理
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
// 是否是一个新事物
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
// 具体的事务提交由具体的事务处理器来完成
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, err);
throw err;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
⚠️:如果是mybatis集成模式下,spring不负责具体的提交回滚操作,只是做事务状态的记录!!具体可以参考mybatis源码org.mybatis.spring.SqlSessionUtils.SqlSessionSynchronization类
- 事务回滚
直接看代码吧
private void processRollback(DefaultTransactionStatus status) {
try {
try {
// 若集成了mybatis,真正数据源的回滚操作发生在这里
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
}
catch (RuntimeException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
cleanupAfterCompletion(status);
}
}
PlatformTransactionManager具体实现
事务处理最重要的4个方法
1)doGetTrancation:创建数据源对象并封装返回事务对象
2)doBegin:开启事务
3)doCommit:提交
4)doRollback:回滚
都是由在TransactionProxyFactoryBean中配置的具体的TransactionManager实现
大致看下DataSourceTransactionManager和HibernateTransactionManager的执行流程,源码比较简单,暂不做分析了