spring 多租户数据源实现事务一致性

  • Post author:
  • Post category:其他

前言

之前我写了一篇spring boot 集成 mybatis-plus实现多租户数据源本来用的很爽,但是最近需要实现不同租户之间的事务一致性,所以讲解决方案记录如下

大致思路

先假设有两个租户 A,B,分别需要执行方法 ma(),mb()
伪代码如下


try{
	ma();
	mb();
}catch(..){
	.....
}

首先我第一时间想到用编程式事务操作,编程式事务能可控制的让所有事务一起提交、一起回滚,如果用平常使用的@Transactional 注解是不行的,因为执行的sql对应的数据源都不一样,Transactional 这个注解产生的事务会让所有在 当前线程运行sql都取自第一次获取的connection

什么意思呢

就是说如果先运行ma方法,那么mb方法运行时会报数据表不存在,因为dataSource事务管理器会缓存当前线程第一次获取的DataSourceTransactionObject
具体查看
org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
org.springframework.transaction.support.TransactionSynchronizationManager#doGetResource

private static Object doGetResource(Object actualKey) {
		// resources 是ThreadLocal
		Map<Object, Object> map = resources.get();
		if (map == null) {
			return null;
		}
		Object value = map.get(actualKey);
		// Transparently remove ResourceHolder that was marked as void...
		if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
			map.remove(actualKey);
			// Remove entire ThreadLocal if empty...
			if (map.isEmpty()) {
				resources.remove();
			}
			value = null;
		}
		return value;
	}

org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin

protected void doBegin(Object transaction, TransactionDefinition definition) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;

		try {
			...
			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			con = txObject.getConnectionHolder().getConnection();

			....
			// 加入当前线程的缓存
			if (txObject.isNewConnectionHolder()) {
				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
			}
		}

		catch (Throwable ex) {
			....
			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
		}
	}

那么我们得想办法让ma()和mb()在不同的线程中运行

代码实现

下图是网上编程式事务的大致用法
在这里插入图片描述

@Transactional(rollbackFor = Exception.class)
    public void payTest() throws Exception{
    
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
        def.setReadOnly(false);


		//单线程的线程池,保证数据库相关异步阻塞操作都为一个线程
        ThreadPoolExecutor singleThreadExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1));


		// spring 容器中获取事务管理器
        PlatformTransactionManager userTransactionManager = SpringContext.getBean(PlatformTransactionManager.class);

        TransactionStatus userTransaction = null;
        Future<TransactionStatus> submit = singleThreadExecutor.submit(() -> {
            DyDataSource.setDbKey(SysConstParam.SYS_DB_KEY);
            TransactionStatus transaction = userTransactionManager.getTransaction(def);

            System.out.println("over1" + Thread.currentThread().getName() + ":" + Thread.currentThread().getId());

            return transaction;
        });

        userTransaction = submit.get();

        try {
            ma();


            Future<Boolean> futureTask = singleThreadExecutor.submit(() -> {
                mb();
                return false;

            });

            Boolean aBoolean = futureTask.get();
            if (!aBoolean) {
                throw new ServiceException("op fail");
            }

            TransactionStatus finalUserTransaction = userTransaction;

			// 手动提交
            singleThreadExecutor.submit(() -> {
                userTransactionManager.commit(finalUserTransaction);
            }).get();


        } catch (Exception e) {
            TransactionStatus finalUserTransaction1 = userTransaction;
            // 手动回滚代码
            singleThreadExecutor.submit(() -> {
                finalUserTransaction1.setRollbackOnly();
                userTransactionManager.rollback(finalUserTransaction1);
            }).get();
            e.printStackTrace();
            throw e;
        }

    }

如上大致完成所需

需要改进

  • 这种写法需要留下重要注释,否则一旦事务没有手动回滚、手动提交会很麻烦
  • 大量异步阻塞操作
  • 代码侵入性太强了

如果哪位朋友有更好更优雅的方案欢迎下方留言


版权声明:本文为weixin_39660224原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。