前言
之前我写了一篇spring boot 集成 mybatis-plus实现多租户数据源本来用的很爽,但是最近需要实现不同租户之间的事务一致性,所以讲解决方案记录如下
大致思路
先假设有两个租户 A,B,分别需要执行方法 ma(),mb()
伪代码如下
try{
ma();
mb();
}catch(..){
.....
}
首先我第一时间想到用编程式事务操作,编程式事务能可控制的让所有事务一起提交、一起回滚,如果用平常使用的@Transactional
注解是不行的,因为执行的sql对应的数据源都不一样,Transactional
这个注解产生的事务会让所有在 当前线程运行sql都取自第一次获取的connection
什么意思呢
就是说如果先运行ma方法,那么mb方法运行时会报数据表不存在,因为dataSource事务管理器会缓存当前线程第一次获取的DataSourceTransactionObject
具体查看
org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
org.springframework.transaction.support.TransactionSynchronizationManager#doGetResource
private static Object doGetResource(Object actualKey) {
// resources 是ThreadLocal
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
...
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
....
// 加入当前线程的缓存
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
....
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
那么我们得想办法让ma()和mb()在不同的线程中运行
代码实现
下图是网上编程式事务的大致用法
@Transactional(rollbackFor = Exception.class)
public void payTest() throws Exception{
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
def.setReadOnly(false);
//单线程的线程池,保证数据库相关异步阻塞操作都为一个线程
ThreadPoolExecutor singleThreadExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1));
// spring 容器中获取事务管理器
PlatformTransactionManager userTransactionManager = SpringContext.getBean(PlatformTransactionManager.class);
TransactionStatus userTransaction = null;
Future<TransactionStatus> submit = singleThreadExecutor.submit(() -> {
DyDataSource.setDbKey(SysConstParam.SYS_DB_KEY);
TransactionStatus transaction = userTransactionManager.getTransaction(def);
System.out.println("over1" + Thread.currentThread().getName() + ":" + Thread.currentThread().getId());
return transaction;
});
userTransaction = submit.get();
try {
ma();
Future<Boolean> futureTask = singleThreadExecutor.submit(() -> {
mb();
return false;
});
Boolean aBoolean = futureTask.get();
if (!aBoolean) {
throw new ServiceException("op fail");
}
TransactionStatus finalUserTransaction = userTransaction;
// 手动提交
singleThreadExecutor.submit(() -> {
userTransactionManager.commit(finalUserTransaction);
}).get();
} catch (Exception e) {
TransactionStatus finalUserTransaction1 = userTransaction;
// 手动回滚代码
singleThreadExecutor.submit(() -> {
finalUserTransaction1.setRollbackOnly();
userTransactionManager.rollback(finalUserTransaction1);
}).get();
e.printStackTrace();
throw e;
}
}
如上大致完成所需
需要改进
- 这种写法需要留下重要注释,否则一旦事务没有手动回滚、手动提交会很麻烦
- 大量异步阻塞操作
- 代码侵入性太强了
如果哪位朋友有更好更优雅的方案欢迎下方留言
版权声明:本文为weixin_39660224原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。