从一个简单的JDBC TX配置开始
基本配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop https://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx https://www.springframework.org/schema/tx/spring-tx.xsd">
<bean id="bookService" class="testtx.impl.BookServiceImpl"/>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
<aop:config>
<aop:pointcut id="bookServiceOp" expression="execution(* testtx.BookService.*(..))"/>
<aop:advisor advice-ref="txAdvice" pointcut-ref="bookServiceOp" id="advisor0"/>
</aop:config>
<bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:13306/test"/>
<property name="username" value="root"/>
<property name="password" value="12345678"/>
</bean>
<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
</beans>
处理流程概览
先来看下,生成的bean以及对应的类型
ID | class |
---|---|
bookService |
com.sun.proxy.$Proxy15 |
txAdvice |
org.springframework.transaction.interceptor.TransactionInterceptor |
org.springframework.aop.config.internalAutoProxyCreator | org.springframework.aop.aspectj.autoproxy.AspectJAwareAdvisorAutoProxyCreator |
bookServiceOp | org.springframework.aop.aspectj.AspectJExpressionPointcut |
advisor0 | org.springframework.aop.support.DefaultBeanFactoryPointcutAdvisor |
dataSource | org.apache.commons.dbcp2.BasicDataSource |
txManager | org.springframework.jdbc.datasource.DataSourceTransactionManager |
- bookService ,实际对应是个代理对象,对应的方法调用会被拦截,拦截后由txAdvice来处理
-
txAdvice,TransactionInterceptor对象,由它来管理transaction。其invoke方法实现如下:
@Override @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() { @Override @Nullable public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } @Override public Object getTarget() { return invocation.getThis(); } @Override public Object[] getArguments() { return invocation.getArguments(); } }); }
有了大致的概念,开始整理整个流程的运作。
tx:advice(通知)
可以在spring-tx模块下的spring.handlers文件中找到tx命名空间处理器TxNamespaceHandler
http\://www.springframework.org/schema/tx=org.springframework.transaction.config.TxNamespaceHandler
public class TxNamespaceHandler extends NamespaceHandlerSupport {
static final String TRANSACTION_MANAGER_ATTRIBUTE = "transaction-manager";
static final String DEFAULT_TRANSACTION_MANAGER_BEAN_NAME = "transactionManager";
static String getTransactionManagerName(Element element) {
return (element.hasAttribute(TRANSACTION_MANAGER_ATTRIBUTE) ?
element.getAttribute(TRANSACTION_MANAGER_ATTRIBUTE) : DEFAULT_TRANSACTION_MANAGER_BEAN_NAME);
}
@Override
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}
}
从TxNamespaceHandler的实现可以了解到如下信息:
- tx:advice对应的事务管理器由属性transaction-manager维护,默认读取id为transactionManager的bean
- advice 元素由TxAdviceBeanDefinitionParser负责解析并注册为BeanDefinition
tx:advice的注册 – TxAdviceBeanDefinitionParser
主要有几个部分需要处理:
-
bean的类型, 注册是TransactionInterceptor.class
@Override protected Class<?> getBeanClass(Element element) { return TransactionInterceptor.class; }
- transactionManager对应的bean的name;可以通过属性指定,或者默认“transactionManager”
-
处理attributes元素,这里也比较关键,子元素为<tx:method> 。每个method维护有对应的事务的配置信息,如下:
- name,匹配, 比如可以指定“*” 匹配所有方法,“get*”匹配所有get方法
- propagation,事务的传播行为,默认为REQUIRED,比较常见的还有REQUIRES_NEW、NESTED
- isolation, 事务的隔离级别,默认值为DEFAULT,可选值有:READ_UNCOMMITTED、READ_COMMITTED、REPEATABLE_READ、SERIALIZABLE
- timeout,
- read-only
- rollback-for, 出现对应的异常时回滚,可以配置多值,逗号隔开
- no-rollback-for, 出现对应的异常时不回滚,可以配置多值,逗号隔开
在TxAdviceBeanDefinitionParser#parseAttributeSource可以找到attributes的处理过程;attributes元素处理好之后,将作为transactionAttributeSource的值,记录在tx:advice对应的beanDefinition上。
TxAdviceBeanDefinitionParser.doParse实现如下:
@Override
protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
builder.addPropertyReference("transactionManager", TxNamespaceHandler.getTransactionManagerName(element));
List<Element> txAttributes = DomUtils.getChildElementsByTagName(element, ATTRIBUTES_ELEMENT);
if (txAttributes.size() > 1) {
parserContext.getReaderContext().error(
"Element <attributes> is allowed at most once inside element <advice>", element);
}
else if (txAttributes.size() == 1) {
// Using attributes source.
Element attributeSourceElement = txAttributes.get(0);
RootBeanDefinition attributeSourceDefinition = parseAttributeSource(attributeSourceElement, parserContext);
builder.addPropertyValue("transactionAttributeSource", attributeSourceDefinition);
}
else {
// Assume annotations source.
builder.addPropertyValue("transactionAttributeSource",
new RootBeanDefinition("org.springframework.transaction.annotation.AnnotationTransactionAttributeSource"));
}
}
代理对象的创建
<aop:config>
<aop:pointcut id="bookServiceOp" expression="execution(* testtx.BookService.*(..))"/>
<aop:advisor advice-ref="txAdvice" pointcut-ref="bookServiceOp" id="advisor0"/>
</aop:config>
aop命名空间由AopNamespaceHandler来处理,在开始解析处理aop的相关配置之前,会注册一个基础实施的BeanDefinition,它将在所有用户自定义的bean之前优先实例化。就是最开始上面看到的bean中的org.springframework.aop.aspectj.autoproxy.AspectJAwareAdvisorAutoProxyCreator。
AspectJAwareAdvisorAutoProxyCreator是InstantiationAwareBeanPostProcessor的实现。当满足条件的bean(例子中的bookService)创建的时候,会创建代理对象并返回。所以获取bookService的时候,对应的类型是个代理类型。
transaction的执行
无论是通过Java动态代理还是cglib创建的代理对象,其调用的方法,都交由txAdvice优先处理,上面知道它是TransactionInterceptor对象,transaction的具体处理转交给org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction。
可以先来看其中一段代码,了解下事务的处理:
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
commitTransactionAfterReturning(txInfo);
return retVal;
使用transaction的方式大致与大家印象中的一致,在业务执行之前先开启事务,成功执行后提交,失败则尝试回滚。在配置tx:advice的时候,还配置了事务隔离级别和传播行为等属性,那么接下来了解下。
事务配置的准备
进入createTransactionIfNecessary的实现,
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
这里有两部分关键部分,
-
从PlatformTransactionManager中获取TransactionStatus ,这里通过AbstractPlatformTransactionManager封装了创建transaction的逻辑,内部
doGetTransaction、doBegin、doCommit、doRollback
等交由具体的transactionManager来执行。根据选型的不同,可能是DataSourceTransactionManager、JtaTransactionManager、HibernateTransactionManager、JpaTransactionManager等等。 - 另一部分是prepareTransactionInfo,主要作用有将事务记录到transactionInfoHolder上,这是个threadlocal对象
DataSourceTransactionManager的事务管理
doGetTransaction
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
主要是获取数据源,并获取之前已存在的connection(没有为空),并记录下来。
开启事务 doBegin
这里做了很多事情,包括获取Connection、配置隔离级别、设置只读、关闭自动提交、设置timeout等工作。
-
设置隔离级别如下:
// Apply specific isolation level, if any. Integer previousIsolationLevel = null; if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { if (debugEnabled) { logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " + definition.getIsolationLevel()); } int currentIsolation = con.getTransactionIsolation(); if (currentIsolation != definition.getIsolationLevel()) { previousIsolationLevel = currentIsolation; con.setTransactionIsolation(definition.getIsolationLevel()); } }
-
只读设置
if (isEnforceReadOnly() && definition.isReadOnly()) { try (Statement stmt = con.createStatement()) { stmt.executeUpdate("SET TRANSACTION READ ONLY"); } }
- 如果是新创建的Connection,与当前dataSource进行绑定,后续可以直接用
事务失败rollback
注意默认的rollback只有发生RuntimeException 或者Error才会回滚
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
事务的传播行为
事务的隔离级别、只读以及超时设置有各TransactionManager来实现,事务的传播行为由AbstractPlatformTransactionManager来管理
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
事务的传播行为只要针对的是事务嵌套的情况,所以重点是handleExistingTransaction
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
总结如下:
- 如果传播行为是PROPAGATION_NEVER, 那么不允许嵌套transaction
- 如果传播行为是PROPAGATION_NOT_SUPPORTED,将不启用transaction执行业务代码。
- 如果传播行为是PROPAGATION_REQUIRES_NEW,将会新起一个transaction,所以内部的事务的成功和失败不会影响外部的;
- PROPAGATION_REQUIRED,不会新起transaction, 而是创建逻辑事务,所以内部事务对外部有影响;
- PROPAGATION_NESTED,会使用savepoint,仅可以与 JDBC resource transactions配合使用。