Spring源码学习篇5 – XML配置了解Transaction

  • Post author:
  • Post category:其他




从一个简单的JDBC TX配置开始



基本配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/aop https://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/tx https://www.springframework.org/schema/tx/spring-tx.xsd">

    <bean id="bookService" class="testtx.impl.BookServiceImpl"/>

    <tx:advice id="txAdvice" transaction-manager="txManager">
        <tx:attributes>
            <tx:method name="get*" read-only="true"/>
            <tx:method name="*"/>
        </tx:attributes>
    </tx:advice>

    <aop:config>
        <aop:pointcut id="bookServiceOp" expression="execution(* testtx.BookService.*(..))"/>
        <aop:advisor advice-ref="txAdvice" pointcut-ref="bookServiceOp" id="advisor0"/>
    </aop:config>

    <bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost:13306/test"/>
        <property name="username" value="root"/>
        <property name="password" value="12345678"/>
    </bean>

    <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

</beans>



处理流程概览

先来看下,生成的bean以及对应的类型

ID class

bookService
com.sun.proxy.$Proxy15

txAdvice
org.springframework.transaction.interceptor.TransactionInterceptor
org.springframework.aop.config.internalAutoProxyCreator org.springframework.aop.aspectj.autoproxy.AspectJAwareAdvisorAutoProxyCreator
bookServiceOp org.springframework.aop.aspectj.AspectJExpressionPointcut
advisor0 org.springframework.aop.support.DefaultBeanFactoryPointcutAdvisor
dataSource org.apache.commons.dbcp2.BasicDataSource
txManager org.springframework.jdbc.datasource.DataSourceTransactionManager
  1. bookService ,实际对应是个代理对象,对应的方法调用会被拦截,拦截后由txAdvice来处理
  2. txAdvice,TransactionInterceptor对象,由它来管理transaction。其invoke方法实现如下:

    @Override
    @Nullable
    public Object invoke(MethodInvocation invocation) throws Throwable {
    	// Work out the target class: may be {@code null}.
    	// The TransactionAttributeSource should be passed the target class
    	// as well as the method, which may be from an interface.
    	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    
    	// Adapt to TransactionAspectSupport's invokeWithinTransaction...
    	return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
    		@Override
    		@Nullable
    		public Object proceedWithInvocation() throws Throwable {
    			return invocation.proceed();
    		}
    		@Override
    		public Object getTarget() {
    			return invocation.getThis();
    		}
    		@Override
    		public Object[] getArguments() {
    			return invocation.getArguments();
    		}
    	});
    }
    

有了大致的概念,开始整理整个流程的运作。



tx:advice(通知)

可以在spring-tx模块下的spring.handlers文件中找到tx命名空间处理器TxNamespaceHandler

http\://www.springframework.org/schema/tx=org.springframework.transaction.config.TxNamespaceHandler
public class TxNamespaceHandler extends NamespaceHandlerSupport {

	static final String TRANSACTION_MANAGER_ATTRIBUTE = "transaction-manager";

	static final String DEFAULT_TRANSACTION_MANAGER_BEAN_NAME = "transactionManager";


	static String getTransactionManagerName(Element element) {
		return (element.hasAttribute(TRANSACTION_MANAGER_ATTRIBUTE) ?
				element.getAttribute(TRANSACTION_MANAGER_ATTRIBUTE) : DEFAULT_TRANSACTION_MANAGER_BEAN_NAME);
	}


	@Override
	public void init() {
		registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
		registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
		registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
	}
}

从TxNamespaceHandler的实现可以了解到如下信息:

  1. tx:advice对应的事务管理器由属性transaction-manager维护,默认读取id为transactionManager的bean
  2. advice 元素由TxAdviceBeanDefinitionParser负责解析并注册为BeanDefinition



tx:advice的注册 – TxAdviceBeanDefinitionParser

主要有几个部分需要处理:

  1. bean的类型, 注册是TransactionInterceptor.class

    	@Override
    protected Class<?> getBeanClass(Element element) {
    	return TransactionInterceptor.class;
    }
    
  2. transactionManager对应的bean的name;可以通过属性指定,或者默认“transactionManager”
  3. 处理attributes元素,这里也比较关键,子元素为<tx:method> 。每个method维护有对应的事务的配置信息,如下:

    • name,匹配, 比如可以指定“*” 匹配所有方法,“get*”匹配所有get方法
    • propagation,事务的传播行为,默认为REQUIRED,比较常见的还有REQUIRES_NEW、NESTED
    • isolation, 事务的隔离级别,默认值为DEFAULT,可选值有:READ_UNCOMMITTED、READ_COMMITTED、REPEATABLE_READ、SERIALIZABLE
    • timeout,
    • read-only
    • rollback-for, 出现对应的异常时回滚,可以配置多值,逗号隔开
    • no-rollback-for, 出现对应的异常时不回滚,可以配置多值,逗号隔开

在TxAdviceBeanDefinitionParser#parseAttributeSource可以找到attributes的处理过程;attributes元素处理好之后,将作为transactionAttributeSource的值,记录在tx:advice对应的beanDefinition上。

TxAdviceBeanDefinitionParser.doParse实现如下:

@Override
protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
	builder.addPropertyReference("transactionManager", TxNamespaceHandler.getTransactionManagerName(element));

	List<Element> txAttributes = DomUtils.getChildElementsByTagName(element, ATTRIBUTES_ELEMENT);
	if (txAttributes.size() > 1) {
		parserContext.getReaderContext().error(
				"Element <attributes> is allowed at most once inside element <advice>", element);
	}
	else if (txAttributes.size() == 1) {
		// Using attributes source.
		Element attributeSourceElement = txAttributes.get(0);
		RootBeanDefinition attributeSourceDefinition = parseAttributeSource(attributeSourceElement, parserContext);
		builder.addPropertyValue("transactionAttributeSource", attributeSourceDefinition);
	}
	else {
		// Assume annotations source.
		builder.addPropertyValue("transactionAttributeSource",
				new RootBeanDefinition("org.springframework.transaction.annotation.AnnotationTransactionAttributeSource"));
	}
}



代理对象的创建

<aop:config>
    <aop:pointcut id="bookServiceOp" expression="execution(* testtx.BookService.*(..))"/>
    <aop:advisor advice-ref="txAdvice" pointcut-ref="bookServiceOp" id="advisor0"/>
</aop:config>

aop命名空间由AopNamespaceHandler来处理,在开始解析处理aop的相关配置之前,会注册一个基础实施的BeanDefinition,它将在所有用户自定义的bean之前优先实例化。就是最开始上面看到的bean中的org.springframework.aop.aspectj.autoproxy.AspectJAwareAdvisorAutoProxyCreator。

AspectJAwareAdvisorAutoProxyCreator是InstantiationAwareBeanPostProcessor的实现。当满足条件的bean(例子中的bookService)创建的时候,会创建代理对象并返回。所以获取bookService的时候,对应的类型是个代理类型。



transaction的执行

无论是通过Java动态代理还是cglib创建的代理对象,其调用的方法,都交由txAdvice优先处理,上面知道它是TransactionInterceptor对象,transaction的具体处理转交给org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction。

可以先来看其中一段代码,了解下事务的处理:

// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
	// This is an around advice: Invoke the next interceptor in the chain.
	// This will normally result in a target object being invoked.
	retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
	// target invocation exception
	completeTransactionAfterThrowing(txInfo, ex);
	throw ex;
}
finally {
	cleanupTransactionInfo(txInfo);
}

if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
	// Set rollback-only in case of Vavr failure matching our rollback rules...
	TransactionStatus status = txInfo.getTransactionStatus();
	if (status != null && txAttr != null) {
		retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
	}
}

commitTransactionAfterReturning(txInfo);
return retVal;

使用transaction的方式大致与大家印象中的一致,在业务执行之前先开启事务,成功执行后提交,失败则尝试回滚。在配置tx:advice的时候,还配置了事务隔离级别和传播行为等属性,那么接下来了解下。



事务配置的准备

进入createTransactionIfNecessary的实现,

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
		@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

	// If no name specified, apply method identification as transaction name.
	if (txAttr != null && txAttr.getName() == null) {
		txAttr = new DelegatingTransactionAttribute(txAttr) {
			@Override
			public String getName() {
				return joinpointIdentification;
			}
		};
	}

	TransactionStatus status = null;
	if (txAttr != null) {
		if (tm != null) {
			status = tm.getTransaction(txAttr);
		}
		else {
			if (logger.isDebugEnabled()) {
				logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
						"] because no transaction manager has been configured");
			}
		}
	}
	return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
	}

这里有两部分关键部分,

  1. 从PlatformTransactionManager中获取TransactionStatus ,这里通过AbstractPlatformTransactionManager封装了创建transaction的逻辑,内部

    doGetTransaction、doBegin、doCommit、doRollback

    等交由具体的transactionManager来执行。根据选型的不同,可能是DataSourceTransactionManager、JtaTransactionManager、HibernateTransactionManager、JpaTransactionManager等等。
  2. 另一部分是prepareTransactionInfo,主要作用有将事务记录到transactionInfoHolder上,这是个threadlocal对象


DataSourceTransactionManager的事务管理


doGetTransaction
protected Object doGetTransaction() {
	DataSourceTransactionObject txObject = new DataSourceTransactionObject();
	txObject.setSavepointAllowed(isNestedTransactionAllowed());
	ConnectionHolder conHolder =
			(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
	txObject.setConnectionHolder(conHolder, false);
	return txObject;
}

主要是获取数据源,并获取之前已存在的connection(没有为空),并记录下来。



开启事务 doBegin

这里做了很多事情,包括获取Connection、配置隔离级别、设置只读、关闭自动提交、设置timeout等工作。

  1. 设置隔离级别如下:

    // Apply specific isolation level, if any.
    Integer previousIsolationLevel = null;
    if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
    	if (debugEnabled) {
    		logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
    				definition.getIsolationLevel());
    	}
    	int currentIsolation = con.getTransactionIsolation();
    	if (currentIsolation != definition.getIsolationLevel()) {
    		previousIsolationLevel = currentIsolation;
    		con.setTransactionIsolation(definition.getIsolationLevel());
    	}
    }
    
  2. 只读设置

    if (isEnforceReadOnly() && definition.isReadOnly()) {
    	try (Statement stmt = con.createStatement()) {
    		stmt.executeUpdate("SET TRANSACTION READ ONLY");
    	}
    }
    
  3. 如果是新创建的Connection,与当前dataSource进行绑定,后续可以直接用



事务失败rollback

注意默认的rollback只有发生RuntimeException 或者Error才会回滚

public boolean rollbackOn(Throwable ex) {
	return (ex instanceof RuntimeException || ex instanceof Error);
}



事务的传播行为

事务的隔离级别、只读以及超时设置有各TransactionManager来实现,事务的传播行为由AbstractPlatformTransactionManager来管理

if (isExistingTransaction(transaction)) {
		// Existing transaction found -> check propagation behavior to find out how to behave.
		return handleExistingTransaction(def, transaction, debugEnabled);
	}

	// Check definition settings for new transaction.
	if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
		throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
	}

	// No existing transaction found -> check propagation behavior to find out how to proceed.
	if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
		throw new IllegalTransactionStateException(
				"No existing transaction found for transaction marked with propagation 'mandatory'");
	}
	else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
			def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
			def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		SuspendedResourcesHolder suspendedResources = suspend(null);
		if (debugEnabled) {
			logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
		}
		try {
			return startTransaction(def, transaction, debugEnabled, suspendedResources);
		}
		catch (RuntimeException | Error ex) {
			resume(null, suspendedResources);
			throw ex;
		}
	}
	else {
		// Create "empty" transaction: no actual transaction, but potentially synchronization.
		if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
			logger.warn("Custom isolation level specified but no actual transaction initiated; " +
					"isolation level will effectively be ignored: " + def);
		}
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
	}

事务的传播行为只要针对的是事务嵌套的情况,所以重点是handleExistingTransaction

private TransactionStatus handleExistingTransaction(
		TransactionDefinition definition, Object transaction, boolean debugEnabled)
		throws TransactionException {

	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
		throw new IllegalTransactionStateException(
				"Existing transaction found for transaction marked with propagation 'never'");
	}

	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction");
		}
		Object suspendedResources = suspend(transaction);
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		return prepareTransactionStatus(
				definition, null, false, newSynchronization, debugEnabled, suspendedResources);
	}

	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction, creating new transaction with name [" +
					definition.getName() + "]");
		}
		SuspendedResourcesHolder suspendedResources = suspend(transaction);
		try {
			return startTransaction(definition, transaction, debugEnabled, suspendedResources);
		}
		catch (RuntimeException | Error beginEx) {
			resumeAfterBeginException(transaction, suspendedResources, beginEx);
			throw beginEx;
		}
	}

	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		if (!isNestedTransactionAllowed()) {
			throw new NestedTransactionNotSupportedException(
					"Transaction manager does not allow nested transactions by default - " +
					"specify 'nestedTransactionAllowed' property with value 'true'");
		}
		if (debugEnabled) {
			logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
		}
		if (useSavepointForNestedTransaction()) {
			// Create savepoint within existing Spring-managed transaction,
			// through the SavepointManager API implemented by TransactionStatus.
			// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
			DefaultTransactionStatus status =
					prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
			status.createAndHoldSavepoint();
			return status;
		}
		else {
			// Nested transaction through nested begin and commit/rollback calls.
			// Usually only for JTA: Spring synchronization might get activated here
			// in case of a pre-existing JTA transaction.
			return startTransaction(definition, transaction, debugEnabled, null);
		}
	}

	// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
	if (debugEnabled) {
		logger.debug("Participating in existing transaction");
	}
	if (isValidateExistingTransaction()) {
		if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
			Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
			if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
				Constants isoConstants = DefaultTransactionDefinition.constants;
				throw new IllegalTransactionStateException("Participating transaction with definition [" +
						definition + "] specifies isolation level which is incompatible with existing transaction: " +
						(currentIsolationLevel != null ?
								isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
								"(unknown)"));
			}
		}
		if (!definition.isReadOnly()) {
			if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
				throw new IllegalTransactionStateException("Participating transaction with definition [" +
						definition + "] is not marked as read-only but existing transaction is");
			}
		}
	}
	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
	return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

总结如下:

  1. 如果传播行为是PROPAGATION_NEVER, 那么不允许嵌套transaction
  2. 如果传播行为是PROPAGATION_NOT_SUPPORTED,将不启用transaction执行业务代码。
  3. 如果传播行为是PROPAGATION_REQUIRES_NEW,将会新起一个transaction,所以内部的事务的成功和失败不会影响外部的;
  4. PROPAGATION_REQUIRED,不会新起transaction, 而是创建逻辑事务,所以内部事务对外部有影响;
  5. PROPAGATION_NESTED,会使用savepoint,仅可以与 JDBC resource transactions配合使用。



版权声明:本文为wisfy_21原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。