18、事务的创建

  • Post author:
  • Post category:其他


上一节,我们围着spring事务绕了一圈,大致看了下事务的外貌,挺漂亮的,但是咱是有内涵的人,做人不能只看外表,俗话说好看的皮囊千千万,有趣的灵魂万里挑一,所以光有一张好看的皮囊是不够的,关键还得看内存,虽然现实好像不是这样。 好了,在进行目标方法调用前,spring肯定要通过数据库连接设置事务属性,这样才能启动事务。

protected TransactionInfo org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(
			PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

		// If no name specified, apply method identification as transaction name.
		//如果存在事务属性并且没有设置名字,那么使用连接点作为事务名
		if (txAttr != null && txAttr.getName() == null) {
			txAttr = new DelegatingTransactionAttribute(txAttr) {
				@Override
				public String getName() {
					return joinpointIdentification;
				}
			};
		}

		TransactionStatus status = null;
		if (txAttr != null) {
			if (tm != null) {
			    //根据事务属性设置事务状态,并将状态封装成模型
			    //(*1*)
				status = tm.getTransaction(txAttr);
			}
			else {
				if (logger.isDebugEnabled()) {
					logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
							"] because no transaction manager has been configured");
				}
			}
		}
		//构建TransactionInfo,并将TransactionInfo绑定到当前线程中
		return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
	}
	
	
	//(*1*)
	public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
	    //(*2*)
	    //获取事务对象
		Object transaction = doGetTransaction();

		// Cache debug flag to avoid repeated checks.
		boolean debugEnabled = logger.isDebugEnabled();

        //如果没有事务属性,那么创建一个默认的事务属性
		if (definition == null) {
			// Use defaults if no transaction definition given.
			definition = new DefaultTransactionDefinition();
		}

        //判断是否已经存在事务了,只要判断事务对象中是否存在ConnectHolder并且这个ConnectHolder处于激活状态
        //就表示已经存在事务
		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
			//根据事务传播行为处理存在的事务
			return handleExistingTransaction(definition, transaction, debugEnabled);
		}

		// Check definition settings for new transaction.
		//判断事务超时值是否有效,无效的抛错
		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
		}

		// No existing transaction found -> check propagation behavior to find out how to proceed.
		//PROPAGATION_MANDATORY这种类型的事务传播行为在不存在事务时,抛出错误
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		//以下类型的事务传播行为在当前不存在事务时是一样的操作逻辑,都是创建一个新的事务
		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
				//挂起事务,事务就不做事务的挂起,返回null,如果存在激活的事务同步器,那么循环挂起每个事务同步器
				//将被挂起的资源封装到SuspendedResourcesHolder中,以便下次使用
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
			}
			try {
			    //是否允许构建新的事务同步
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				//创建事务状态,第一个参数是事务属性,第一个参数是事务对象,第三个参数表示是否是创建的新事务
				//第四个参数表示是否需要创建新的事务同步,第五个参数表示是否允许打印debug级别的日志,
				//第六个参数封装的是挂起的资源
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				//开始事务
				doBegin(transaction, definition);
				//预备事务同步
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException ex) {
			    //恢复上一个事务,回滚当前事务,清理当前事务的资源
				resume(null, suspendedResources);
				throw ex;
			}
			catch (Error err) {
			    //恢复上一个事务,回滚当前事务,清理当前事务的资源
				resume(null, suspendedResources);
				throw err;
			}
		}
		//其他事务传播行为,以非事务方式处理
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + definition);
			}
			//不存在事务时,只有同步类型为SYNCHRONIZATION_ALWAYS才允许创建事务同步
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
		}
	}
	
	//(*2*)
	protected Object org.springframework.jdbc.datasource.DataSourceTransactionManager.doGetTransaction() {
		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
		//设置是否允许嵌入式的事务(保存点)
		txObject.setSavepointAllowed(isNestedTransactionAllowed());
		//从当前线程中获取数据库连接holder
		ConnectionHolder conHolder =
				(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
		//将连接holder记录到当前事务对象中,并标注这是一个旧事务
		txObject.setConnectionHolder(conHolder, false);
		return txObject;
	}

事务同步:用于在操作事务时,比如commit,rollback,AfterCommit等操作时进行事件同步,相当于事件监听。 我们在深入了解事务之前,我们先来复习下spring事务传播行为

事务传播行为类型 说明
PROPAGATION_REQUIRED 如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。这是最常见的选择
PROPAGATION_SUPPORTS 支持当前事务,如果当前没有事务,就以非事务方式执行
PROPAGATION_MANDATORY 使用当前的事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW 新建事务,如果当前存在事务,把当前事务挂起。
PROPAGATION_NOT_SUPPORTED 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。

我先来学习下不存在事务时是如何创建一个新事务的

protected void org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(Object transaction, TransactionDefinition definition) {
        //事务对象
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;

		try {
		    //如果当前事务没有设置数据库连接,或者没有进行事务同步,那么创建连接
			if (txObject.getConnectionHolder() == null ||
					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
					//使用数据连接池获取数据库连接
				Connection newCon = this.dataSource.getConnection();
				if (logger.isDebugEnabled()) {
					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
				}
				//给当前事务对象设置数据连接,并标识为新创建的数据库连接
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
			}
            //设置事务同步为true
			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			con = txObject.getConnectionHolder().getConnection();
            //(*1*)
            //使用事务属性定义的值去设置数据库连接,返回旧的隔离级别
			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
			//储存旧的隔离级别
			txObject.setPreviousIsolationLevel(previousIsolationLevel);

			// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
			// so we don't want to do it unnecessarily (for example if we've explicitly
			// configured the connection pool to set it already).
			//如果数据库连接默认是自动提交的,那么设置为手动提交,并标识这个连接使用完后必须恢复为自动提交
			if (con.getAutoCommit()) {
				txObject.setMustRestoreAutoCommit(true);
				if (logger.isDebugEnabled()) {
					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
				}
				//设置为手动提交
				con.setAutoCommit(false);
			}
			//设置事务状态为已激活
			txObject.getConnectionHolder().setTransactionActive(true);
            //设置事务超时时间
			int timeout = determineTimeout(definition);
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
			}

			// Bind the session holder to the thread.
			//如果是一个新的连接,那么将当前数据库连接绑定到当前线程中,以备hibernate,mybatis或者其他的框架获取
			if (txObject.isNewConnectionHolder()) {
				TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
			}
		}

		catch (Throwable ex) {
			if (txObject.isNewConnectionHolder()) {
				DataSourceUtils.releaseConnection(con, this.dataSource);
				txObject.setConnectionHolder(null, false);
			}
			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
		}
	}
	
	
	
	//(*1*)
	public static Integer prepareConnectionForTransaction(Connection con, TransactionDefinition definition)
			throws SQLException {

		Assert.notNull(con, "No Connection specified");

		// Set read-only flag.
		if (definition != null && definition.isReadOnly()) {
			try {
				if (logger.isDebugEnabled()) {
					logger.debug("Setting JDBC Connection [" + con + "] read-only");
				}
			    //设置只读属性
				con.setReadOnly(true);
			}
			catch (SQLException ex) {
				Throwable exToCheck = ex;
				while (exToCheck != null) {
					if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
						// Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0
						throw ex;
					}
					exToCheck = exToCheck.getCause();
				}
				// "read-only not supported" SQLException -> ignore, it's just a hint anyway
				logger.debug("Could not set JDBC Connection read-only", ex);
			}
			catch (RuntimeException ex) {
				Throwable exToCheck = ex;
				while (exToCheck != null) {
					if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
						// Assume it's a connection timeout that would otherwise get lost: e.g. from Hibernate
						throw ex;
					}
					exToCheck = exToCheck.getCause();
				}
				// "read-only not supported" UnsupportedOperationException -> ignore, it's just a hint anyway
				logger.debug("Could not set JDBC Connection read-only", ex);
			}
		}

		// Apply specific isolation level, if any.
		Integer previousIsolationLevel = null;
		if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
			if (logger.isDebugEnabled()) {
				logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
						definition.getIsolationLevel());
			}
			//设置数据库隔离级别
			int currentIsolation = con.getTransactionIsolation();
			if (currentIsolation != definition.getIsolationLevel()) {
				previousIsolationLevel = currentIsolation;
				con.setTransactionIsolation(definition.getIsolationLevel());
			}
		}

		return previousIsolationLevel;
	}

除了获取数据库连接,将事务属性值设置到连接上之外,spring还将当前获取到的数据库连接绑定到了线程中,以便提供给mybatis,hibernate等框架获取使用。解析来spring又进行了事务同步的设置

protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        //只有在允许事务同步的情况下才进行一下设置
		if (status.isNewSynchronization()) {
		    //记录当前事务已经激活
			TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
			//设置隔离级别
			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
					definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
							definition.getIsolationLevel() : null);
			//记录当前事务是否只读
			TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
			//记录当前事务的事务名,一般都是连接点的类名加方法名
			TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
			//初始化事务同步,内部过多的操作,只是设置了一个空的set集合到当前线程中
			TransactionSynchronizationManager.initSynchronization();
		}
	}

上面代码中有个叫TransactionSynchronizationManager的类,这个类是专门用来管理事务同步的,我们看下它成员变量定义

//用于保存事务资源,比如上面提到的ConnectHolder
private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

//用于保存事务同步
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
		new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");
//用于记录当前事务名
private static final ThreadLocal<String> currentTransactionName =
		new NamedThreadLocal<String>("Current transaction name");
//标识当前事务是否为只读
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
		new NamedThreadLocal<Boolean>("Current transaction read-only status");
//记录当前事务的隔离级别
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
		new NamedThreadLocal<Integer>("Current transaction isolation level");
//记录当前事务是否已经被激活
private static final ThreadLocal<Boolean> actualTransactionActive =
		new NamedThreadLocal<Boolean>("Actual transaction active");

以上是不存在事务的情况下的处理,那么存在事务的时候这么处理的呢?

private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException {
        //PROPAGATION_NEVER 如果当前存在事务,抛错
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			throw new IllegalTransactionStateException(
					"Existing transaction found for transaction marked with propagation 'never'");
		}
        //PROPAGATION_NOT_SUPPORTED 如果存在事务,挂起当前事务
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction");
			}
			//挂起当前事务
			Object suspendedResources = suspend(transaction);
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			//创建TransactionStatus,第二参数为null,表示没有事务对象,第三个参数为false表示不需要创建新的事务
			//最后一个参数就是我们挂起的事务资源
			return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
		}
        //PROPAGATION_REQUIRES_NEW 挂起当前事务,创建一个新的事务
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction, creating new transaction with name [" +
						definition.getName() + "]");
			}
			//挂起事务
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				//创建DefaultTransactionStatus
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				//开始事务
				doBegin(transaction, definition);
				//预备事务同步
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException beginEx) {
			    //恢复挂起事务
				resumeAfterBeginException(transaction, suspendedResources, beginEx);
				throw beginEx;
			}
			catch (Error beginErr) {
			    //恢复挂起事务
				resumeAfterBeginException(transaction, suspendedResources, beginErr);
				throw beginErr;
			}
		}
        //PROPAGATION_NESTED 不存在事务时,创建创建事务,存在事务时,以嵌套事务的方式执行
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		    //如果不允许嵌套事务,那么抛出异常
			if (!isNestedTransactionAllowed()) {
				throw new NestedTransactionNotSupportedException(
						"Transaction manager does not allow nested transactions by default - " +
						"specify 'nestedTransactionAllowed' property with value 'true'");
			}
			if (debugEnabled) {
				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
			}
			//允许使用Savepoint
			if (useSavepointForNestedTransaction()) {
				// Create savepoint within existing Spring-managed transaction,
				// through the SavepointManager API implemented by TransactionStatus.
				// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
				DefaultTransactionStatus status =
						prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
				//创建保存点
				status.createAndHoldSavepoint();
				return status;
			}
			else {
				// Nested transaction through nested begin and commit/rollback calls.
				// Usually only for JTA: Spring synchronization might get activated here
				// in case of a pre-existing JTA transaction.
				//通过嵌套的begin,commit/rollback管理事务,按照注释的说法,这种方式通知只用于jta分布式事务
				//jta分布式本来还没使用过呢
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, null);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
		}

		// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
		if (debugEnabled) {
			logger.debug("Participating in existing transaction");
		}
		//校验存在的事务
		if (isValidateExistingTransaction()) {
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				//如果当前事务级别不是默认隔离级别,但是事务同步中获取的隔离级别与定义的不相符,抛错
				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
					Constants isoConstants = DefaultTransactionDefinition.constants;
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] specifies isolation level which is incompatible with existing transaction: " +
							(currentIsolationLevel != null ?
									isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
									"(unknown)"));
				}
			}
			//不是只读的属性,却被设置成了只读
			if (!definition.isReadOnly()) {
				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] is not marked as read-only but existing transaction is");
				}
			}
		}
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		//创建TransactionStatus,记录当前事务状态
		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
	}

当存在事务的时候,我们可能不想使用当前这个事务,我们想另起一个事务继续执行,以免异常时导致外层事务发生回滚,所以我们需要挂起原来的事务

protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
        //如果存在事务同步,那么首先挂起这些事务同步并清除当前线程的同步数据
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
			try {
			    //用于保存挂起资源
				Object suspendedResources = null;
				if (transaction != null) {
				    //挂起事务,将当前线程的数据库连接解绑,事务对象应用的数据连接设置为null
					suspendedResources = doSuspend(transaction);
				}
				//清空与当前事务相关的记录
				String name = TransactionSynchronizationManager.getCurrentTransactionName();
				TransactionSynchronizationManager.setCurrentTransactionName(null);
				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
				TransactionSynchronizationManager.setActualTransactionActive(false);
				//构建挂起资源holder,用于保存当前事务状态
				return new SuspendedResourcesHolder(
						suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
			}
			catch (RuntimeException ex) {
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw ex;
			}
			catch (Error err) {
				// doSuspend failed - original transaction is still active...
				//发生异常,恢复挂起资源
				doResumeSynchronization(suspendedSynchronizations);
				throw err;
			}
		}
		else if (transaction != null) {
		    //清空事务对象中数据库连接
			// Transaction active but no synchronization active.
			Object suspendedResources = doSuspend(transaction);
			return new SuspendedResourcesHolder(suspendedResources);
		}
		else {
			// Neither transaction nor synchronization active.
			return null;
		}
	}

下面是创建事务信息类图

[外链图片转存失败(img-ymhxz5Iu-1564910743704)(612D5BD52E09472888F5392BE46A4CA2)]

下一节,我们继续分析事务的回滚和提交



版权声明:本文为qq_27785239原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。