在Spring与Hibernate整合的时候,可以利用Spring的事务管理机制,为我们管理事务的开启、提交、回滚等操作。这样的方式极大的减少了我们的代码量,让我们只专注于业务代码的编写。在使用Hibernate的时候,每一个操作都要经历事务开启与提交这样的操作,他们在业务代码的周围,这样来看是不是就想到了AOP编程,把这部分代码抽取出来。没错,Spring正是这样做的,Spring的事务管理就是基于AOP的。
1 Spring的事务隔离与传播
Srping的事务定义了五个隔离等级(isolation)与7个传播行为(propagation)。他们与我们了解的数据库事务有什么区别与联系呢?先介绍一下事务隔离与传播的概念
-
事务隔离:
当前事务和其他事务的隔离成都。例如,这个事务能否看到其他事务未提交的数据等。(read-uncommitted)
-
事务传播:
通常,在事务中执行的代码都会在当前事务中运行。但是,如果一个事务上下文已经存在,有几个选项可以指定该事务性方法的执行行为。如挂起、创建一个新的事务、嵌套执行等等。
REQUIRED | 要求在当前事务环境中执行该方法,如果已处于当前环境,直接调用,否则启动新的事务执行该方法 |
SUPPORTS | 如果当前执行线程处于事务环境中,则使用当前事务,否则不使用事务 |
MANDATORY | 要求调用该方法的线程必须处于事务环境中,否则抛出异常 |
REQUIRES_NEW | 该方法要求在新的事务环境中执行,如果当前线程已处于事务中,则先挂起该事务,启动新的事务;如果不处于事务中,则启动熄灯呢事务 |
NOT_SUPPORTED | 如果调用该方法的线程处于事务中,则先暂停该事务,然后执行该方法 |
NEVER | 不允许调用该方法的线程处于事务中,如果该线程处于事务中,抛出异常 |
NESTED | 如果已经处于事务中,启动新的事务并嵌套执行 |
这里面的描述视乎不太容易理解,在下面的源码分析中,就会看到他们是如何起作用的了。到时候再对照这些描述,就很容易理解了。
2 Spring事务的基本流程
Spring的事务过程与我们理解的AOP过程是很相似的,它大致分为两个阶段,图中蓝色区域表示的是事务的准备阶段,这个阶段主要完成了事务的准备工作,包括事务属性的读取,事务的创建等工作。这部分代码基本定义在AbstractPlatformTransactionManager类中,也就是说他的大部分代码是与平台无关的;第二个阶段是图中绿色区域,这里进入到事务的实施阶段,包括事务的开启、提交、回滚等操作。这部分代码对于不同的ORM框架来说是不相同的,所以他们都定义在各自的transactionManager中,这也是
典型的策略模式
,本文将以HibernateTransactionManager源码进行分析
。
3 事务准备工作
3.1 事务从哪里开始?
先来看applicationContext中与事务有关的配置文档
<!-- 配置事务增强处理Bean,指定事务管理器 -->
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<!-- 用于配置详细的事务语义 -->
<tx:attributes>
<!-- 所有以'get'开头的方法是read-only的 -->
<tx:method name="get*" read-only="true" />
<!-- 其他方法使用默认的事务设置 -->
<tx:method name="*" propagation="REQUIRED" isolation="DEFAULT"/>
</tx:attributes>
</tx:advice>
<bean id="test" class="com.songxu.entity.Log"></bean>
<aop:config expose-proxy="true">
<!-- 只对业务逻辑层实施事务 -->
<aop:pointcut id="txPointcut" expression="execution(* com.songxu.entity.*.*(..))" />
<!-- Advisor定义,切入点和通知分别为txPointcut、txAdvice -->
<aop:advisor pointcut-ref="txPointcut" advice-ref="txAdvice" />
</aop:config>
笔者在分析源码之前,这个<tx:advice>标签是如何被包装成advisor的,这必须要看一看它的schema文件了,这里截取了一小段来看
看到这里是不是有点感觉了,这个<tx:advice>实际对应的是TransactionInterceptor这个类,它配置的属性最后都作为事务的属性注入到这个类中。找到这个类,也就找到了我们熟悉的invoke方法。对于XXXInterceptor这样的类,它一定是为代理对象服务的。但是我们似乎没有明确的指定出那个类或接口作为目标对象。笔者又翻看了log输出,在其中找到了答案。
其实在我们指定<aop:pointcut>的时候,对应的表达式指定的就是那些类被作为代理类。我们通常在这里会指定一个包的范围,他们或是service层或是dao层。因为数据库事务通常需要在这里开启。当我们调用这些类里面的方法时候,通常也是在访问数据库的过程,这时就会执行invoke方法,进入了AOP事务增强的方法链,也就完成了事务的工作。
3.2 invoke
invoke方法是输入aop的入口,对于事务也不例外。从这里开始,就进入到了Spring 事务阶段。它实际调用的是TransactionAspectSupport类的invokeWithinTransaction方法
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
//这里实际调用TransactionAspectSupport类的方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
// 注册一个回调方法 回调实际业务方法
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
invokeWithinTransaction 方法
它实际上是整个事务过程的纲领性方法。所有的过程都在这里完成,这个过程也很清晰。首先读取了事务的配置属于,然后得到事务的处理器,获得一个事务存放在TransactionInfo里面(这里实际暗含了开启事务的操作,后面会介绍到),然后调用目标方法,然后根据实际情况提交或是回滚,最后释放掉TranscationInfo对象。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
// 读取事务的属性配置
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
//获得具体的事务处理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass);
/**
*这里区分不同类型的PlatformTransactionManager 因为他们的调用方式不同
对于CallbackPreferringPlatformTransactionManager来说,它需要以回调函数的方式实现事务的创建与提交
*/
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
//获取事务,并把他们放到TransactionInfo中。
//这个TransactionInfo就是一个小的容器,里面包含了与事务有关的属性信息。在事务关闭的时候需要释放掉
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
//这里非常熟悉,通过方法链调用来执行目标方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 如果发生了异常,需要根据实际情况回滚或提交
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//释放掉TransactionInfo
cleanupTransactionInfo(txInfo);
}
//进行事务提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
//采用回调方法使用事务处理器
else {
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
return new ThrowableHolder(ex);
}
}
finally {
cleanupTransactionInfo(txInfo);
}
}
});
// Check result: It might indicate a Throwable to rethrow.
if (result instanceof ThrowableHolder) {
throw ((ThrowableHolder) result).getThrowable();
}
else {
return result;
}
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
}
}
3.3 创建事务(createTransactionIfNecessary && getTransaction)
在事务开启之前,首先要有事务,这就是第一步创建或获取事务的过程。这个过程的入口是
createTransactionIfNecessary 方法。
3.3.1
createTransactionIfNecessary
在这个方法中,可以看到两个重要的数据对象TransactionSatus和TransactionInfo的创建,这两个对象持有的数据是事务处理器对事务进行处理的重要依据,这两耳光对象的使用贯穿整个事务处理的过程。在这个方法的最后一行是对另外一个准备方法的调用,它构造了一个TransactionInfo对象,并把这个对象绑定到了当前线程中,同时在TransactionInfo对象中由一个变量来保存以前的TransactionInfo。
这样就有了一连串的TransactionInfo,虽然不一定总是创建新的事务,但是一定会创建这样一个TransactionInfo对象。
protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果没有指定名字,使用方法特征作为事务名
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
// 这个TransactionStatus封装了事务执行的状态信息
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
/*
这里使用定义好的属性信息创建事务
事务创建通过事务管理器来完成,同时返回状态信息
*/
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 构造一个TransactionInfo对象封装事务的信息,并把这个对象与线程绑定
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
3.3.2 getTransaction 获取事务
这个方法定义了获取事务的基本方法,在里面根据我们配置的不同,来创建或挂起事务,这些配置就是我们最开始提到的事务传播行为。
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();
// 缓存log的debug开关,避免总是去检验
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// 如果没有设置事务属性,那么使用默认的事务
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
// 如果当前线程已经存在一个事务,那么就按照存在的方式去处理
return handleExistingTransaction(definition, transaction, debugEnabled);
}
//当前线程不存在事务,就会创建一个新的事务
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// 如果指定了事务传播为PROPAGATION_MANDATORY 那么又没有当前事务,就会抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//如果指定了传播为PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED 就创建一个新的事务
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//不需要挂起任何事务
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//创建与开始事务的实际调用,这是由具体的事务管理器来完成的,例如HibernateTransactionManager
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
对于创建事务,当然还存在一种情况就是当前线程已经存在了一个事务,那么就需要handleExistingTransaction这个方法去解决这一问题。
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//如果设置为PROPAGATION_NEVER,又存在一个事务,就要抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//如果设置为PROPAGATION_NOT_SUPPORTED,又存在一个事务,就将这个事务挂起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
//这里的前两个参数为null和false,说明事务不需要放在事务环境中,同时挂起的事务也被保存在这里
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//如果设置为PROPAGATION_REQUIRES_NEW ,则创建新的事务,把当前事务挂起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 这里保存了新的事务信息,同时也保存了挂起的事务信息
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
//创建嵌套的事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
4 基于Hibernate的事务管理器
4.1事务的开启
在前面的getTransaction方法中有一个doBegin方法,它是事务开启的实际调用方法。这个方法也是由不同的平台去实现的,来看一看HibernateTransactionManager是如何实现这个方法的。
这个过程大致分为两个阶段,第一构造一个sessionHolder对象,这里面封装了HibernateSession 以及HibernateTransaction ,如果session或Transaction不存在,需要通过sessionFactory获得;第二个阶段就是开启transaction,即transaction.begin();同时把sessionHolder绑定到线程。
在这里有一个步骤是在设置flushmode,FlushMode是session的刷新模式,它指定了session在查询、flush或commit方法时的动作,设置为auto的时候,以前三个动作都会清理session缓存,如果设置为NEVER(MANUAL)时,只有在flush时清理缓存。
protected void doBegin(Object transaction, TransactionDefinition definition) {
//将事务转换为Hibernage事务组件
HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;
if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
throw new IllegalTransactionStateException(
"Pre-bound JDBC Connection found! HibernateTransactionManager does not support " +
"running within DataSourceTransactionManager if told to manage the DataSource itself. " +
"It is recommended to use a single HibernateTransactionManager for all transactions " +
"on a single DataSource, no matter whether Hibernate or JDBC access.");
}
Session session = null;
//如果SessionHolder还没有被创建,就创建在一个新的Hibernate session,并放入到SessionHolder中
//这个SessionHolder稍后会绑定到线程中
try {
if (txObject.getSessionHolder() == null || txObject.getSessionHolder().isSynchronizedWithTransaction()) {
Interceptor entityInterceptor = getEntityInterceptor();
Session newSession = (entityInterceptor != null ?
getSessionFactory().withOptions().interceptor(entityInterceptor).openSession() :
getSessionFactory().openSession());
if (logger.isDebugEnabled()) {
logger.debug("Opened new Session [" + newSession + "] for Hibernate transaction");
}
txObject.setSession(newSession);
}
//从SessionHolder中得到Session
session = txObject.getSessionHolder().getSession();
if (this.prepareConnection && isSameConnectionForEntireSession(session)) {
// We're allowed to change the transaction settings of the JDBC Connection.
if (logger.isDebugEnabled()) {
logger.debug("Preparing JDBC Connection of Hibernate Session [" + session + "]");
}
Connection con = ((SessionImplementor) session).connection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
if (this.allowResultAccessAfterCompletion && !txObject.isNewSession()) {
int currentHoldability = con.getHoldability();
if (currentHoldability != ResultSet.HOLD_CURSORS_OVER_COMMIT) {
txObject.setPreviousHoldability(currentHoldability);
con.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
}
}
else {
// Not allowed to change the transaction settings of the JDBC Connection.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
// We should set a specific isolation level but are not allowed to...
throw new InvalidIsolationLevelException(
"HibernateTransactionManager is not allowed to support custom isolation levels: " +
"make sure that its 'prepareConnection' flag is on (the default) and that the " +
"Hibernate connection release mode is set to 'on_close' (the default for JDBC).");
}
if (logger.isDebugEnabled()) {
logger.debug("Not preparing JDBC Connection of Hibernate Session [" + session + "]");
}
}
//如果事务方法定义为只读并且这是一个新的Session 那么设置session的FlushMode 为Never/MANUAL
if (definition.isReadOnly() && txObject.isNewSession()) {
// Just set to MANUAL in case of a new Session for this transaction.
session.setFlushMode(FlushMode.MANUAL);
}
//如果事务方法不是只读方法并且这也不是一个新的Session,设置FlushMode 为Auto
if (!definition.isReadOnly() && !txObject.isNewSession()) {
// We need AUTO or COMMIT for a non-read-only transaction.
FlushMode flushMode = session.getFlushMode();
if (session.getFlushMode().equals(FlushMode.MANUAL)) {
session.setFlushMode(FlushMode.AUTO);
txObject.getSessionHolder().setPreviousFlushMode(flushMode);
}
}
Transaction hibTx;
// 如果指定了timeout,需要设置timeout 然后开始事务
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
// Use Hibernate's own transaction timeout mechanism on Hibernate 3.1+
// Applies to all statements, also to inserts, updates and deletes!
hibTx = session.getTransaction();
hibTx.setTimeout(timeout);
hibTx.begin();//开启事务
}
//如果没有指定timeout,则直接开启事务
else {
// Open a plain Hibernate transaction without specified timeout.
hibTx = session.beginTransaction();
}
// 将这个HibernateTransaction 放入到sessionHolder中
txObject.getSessionHolder().setTransaction(hibTx);
// Register the Hibernate Session's JDBC Connection for the DataSource, if set.
if (getDataSource() != null) {
Connection con = ((SessionImplementor) session).connection();
ConnectionHolder conHolder = new ConnectionHolder(con);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
conHolder.setTimeoutInSeconds(timeout);
}
if (logger.isDebugEnabled()) {
logger.debug("Exposing Hibernate transaction as JDBC transaction [" + con + "]");
}
TransactionSynchronizationManager.bindResource(getDataSource(), conHolder);
txObject.setConnectionHolder(conHolder);
}
// 将SessionHolder绑定到内存
if (txObject.isNewSessionHolder()) {
TransactionSynchronizationManager.bindResource(getSessionFactory(), txObject.getSessionHolder());
}
txObject.getSessionHolder().setSynchronizedWithTransaction(true);
}
catch (Throwable ex) {
if (txObject.isNewSession()) {
try {
if (session.getTransaction().isActive()) {
session.getTransaction().rollback();
}
}
catch (Throwable ex2) {
logger.debug("Could not rollback Session after failed transaction begin", ex);
}
finally {
SessionFactoryUtils.closeSession(session);
txObject.setSessionHolder(null);
}
}
throw new CannotCreateTransactionException("Could not open Hibernate Session for transaction", ex);
}
}
4.2 事务的挂起
在前面getTransaction的时候,会存在一些情况需要挂起当前事务。调用了suspend方法,而在这个方法中,实际调用的依然是底层的doSuspend方法。HibernateTransactionManager实现了这个方法,也就是说Hibernate事务处理器支持事务挂起。
而这个事务挂起方法实际上就是把与这个事务相关的资源与当前的sessionHolder解除关系,也可以说是与当前线程解除关系,并把它保存在另外一个容器SuspendResourcesHolder中。这样可以方便事务的恢复。
protected Object doSuspend(Object transaction) {
HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;
//把当前的sessionHolder从线程中和TransactionObject中释放
txObject.setSessionHolder(null);
SessionHolder sessionHolder =
(SessionHolder) TransactionSynchronizationManager.unbindResource(getSessionFactory());
//把当前的connectionHolder从线程是和TransactionObject中释放
txObject.setConnectionHolder(null);
ConnectionHolder connectionHolder = null;
if (getDataSource() != null) {
connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource());
}
return new SuspendedResourcesHolder(sessionHolder, connectionHolder);
}
4.3 事务的提交
在前面的分析中,可以看到事务提交的入口是commitTransactionAfterReturning方法。
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
它实际调用的依然是AbstractPlatformTransactionManager中的commit方法。这个方法与getTransaction十分相似,它是提交过程的纲领,具体的工作交由processCommit或processRollBack来处理,对应的也就是提交过程中可能出现的两种情况。
4.3.1 commit方法
在这个方法的第一行有一个状态的判断,如果事务已经完成,就会抛出异常。笔者就曾经遇到这种情况,在利用Spring开启事务的情况下,依然选择了提交事务,这个时候就抛出了异常。
public final void commit(TransactionStatus status) throws TransactionException {
//如果事务已经提交完成 抛出异常
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
//如果事务过程中发生异常,就要回滚
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus);//处理回滚
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus);
// Throw UnexpectedRollbackException only at outermost transaction boundary
// or if explicitly asked to.
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
return;
}
//处理提交
processCommit(defStatus);
}
4.3.2 processCommit方法
这个方法是处理提交的方法,在这里完成了事务提交的准备工作。在这个提交的过程中,会对当前事务进行一次判断,
如果是一个全新的事务,就交给具体的事务管理器提交,如果不是一个新的事务,则不会发起提交操作,而是把这个提交任务交给已经存在的事务进行
。实际的提交动作依然由具体的事务管理器去处理。、
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
//事务的提交准备工作
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
//如果存在嵌套事务,会首先释放掉这个嵌套事务
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
// 仅对全新的事务进行提交工作
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
//交给具体的事务管理器去处理
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, err);
throw err;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
4.3.3 doCommit方法
对于这个方法已经很简单了,就是从sessionHolder中获得transaction,然后执行提交动作。
protected void doCommit(DefaultTransactionStatus status) {
HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Committing Hibernate transaction on Session [" +
txObject.getSessionHolder().getSession() + "]");
}
try {
//获得transaction并提交
txObject.getSessionHolder().getTransaction().commit();
}
catch (org.hibernate.TransactionException ex) {
// assumably from commit call to the underlying JDBC connection
throw new TransactionSystemException("Could not commit Hibernate transaction", ex);
}
catch (HibernateException ex) {
// assumably failed to flush changes to database
throw convertHibernateAccessException(ex);
}
}
4.4 事务的回滚
在前面的invokeWithinTransaction方法中,24行执行了一个completeTransactionAfterThrowing方法,在这里面处理了目标方法执行过程中发生异常的情况。
异常发生后,会根据异常的类型决定是否发生回滚操作。这就是第一个回滚的入口
,另外一个回滚的入口就是在4.3中提到的回滚情况。回滚的执行与commit很类似,AbstractPlatformTransactionManager中定义了一个processRollBack方法
4.4.1
processRollback
方法
这个方法与processCommit是极为相似的。它首先去事务进行了一些判断,处理嵌套事务的回滚。依然只处理当新建事务中回滚,如果不是新建事务,交给前一个事务去处理回滚。实际的回滚动作依然由具体的事务管理器去处理。
private void processRollback(DefaultTransactionStatus status) {
try {
try {
triggerBeforeCompletion(status);
//处理嵌套事务的回滚
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
//只处理新建事务的回滚
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
//具体动作由具体的事务管理器完成
doRollback(status);
}
//非新建事务的回滚由前一个事务去处理
else if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
}
catch (RuntimeException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
cleanupAfterCompletion(status);
}
}
4.4.2 doRollback方法
Hibernate事务管理器对事务回滚的处理也是很简单的,从SessionHolder中获得transaction并调用回滚方法。
protected void doRollback(DefaultTransactionStatus status) {
//获得事务
HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Rolling back Hibernate transaction on Session [" +
txObject.getSessionHolder().getSession() + "]");
}
try {
//Hibernate事务回滚
txObject.getSessionHolder().getTransaction().rollback();
}
catch (org.hibernate.TransactionException ex) {
throw new TransactionSystemException("Could not roll back Hibernate transaction", ex);
}
catch (HibernateException ex) {
// Shouldn't really happen, as a rollback doesn't cause a flush.
throw convertHibernateAccessException(ex);
}
finally {
if (!txObject.isNewSession() && !this.hibernateManagedSession) {
// Clear all pending inserts/updates/deletes in the Session.
// Necessary for pre-bound Sessions, to avoid inconsistent state.
txObject.getSessionHolder().getSession().clear();
}
}
}
5 小结
从声明式事务的整个实现可以看到,这个过程就是一个完整的Spring AOP的实现。它根据我们的配置将dao层或service层封装成代理对象,以此所有方法的调用都会触发invoke方法进入AOP环节。在实际业务方法之前,进行了事务的开启动作以及事务的挂起操作,在业务方法之后,进行了事务的提交或回滚操作。而这些实际的操作则是由底层的事务管理器去实现的。