场景:
1、由于多线程每个线程都是一个异步任务,所以每个线程都是一个单独的事务,通常使用的声明式事务 @Transactional() 是无法控制多线程中事务的
2、所以只能另寻解决方式
解决:
一、基于TransactionStatus集合来控制多线程事务提交(推荐此方式)
1、代码案例
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<>());
@Override
@Transactional(rollbackFor = {Exception.class})
public void testMultiThreadTransactional() throws BizException {
//模拟总数据
List<SysUserAddress> sysUserAddresses = ListUtil.toList();
for (int i = 0; i < 10000; i++) {
sysUserAddresses.add(new SysUserAddress(null, "上海市" + (i + 1), "上海市", "浦东新区"));
}
//线程数,按线程数拆分,默认3个线程
int threadCount = 3;
//按线程数平均分配后,每个线程处理的数据量
int perThreadData = sysUserAddresses.size() / threadCount;
//按线程数平均分配后,多余的数据量
int remainderCount = sysUserAddresses.size() % threadCount;
//有多余的数据,再开个线程处理
boolean havingRemainder = remainderCount > 0;
if (havingRemainder) {
threadCount += 1;
}
//子线程倒计锁
CountDownLatch threadLatchs = new CountDownLatch(threadCount);
//子线程中是否有异常标识
AtomicBoolean isError = new AtomicBoolean(false);
try {
for (int i = 0; i < threadCount; i++) {
//设置每个线程处理的数据量,多余的数据放在最后一个线程中处理
List<SysUserAddress> splitList = sysUserAddresses.stream()
.skip((long) i * perThreadData)
.limit((i == threadCount - 1) ? (havingRemainder ? remainderCount : perThreadData) : perThreadData)
.collect(Collectors.toList());
//开启多线程
executorService.execute(() -> {
try {
try {
this.sysUserAddressService.saveSysUserAddressByTransaMan(dataSourceTransactionManager, transactionStatuses, splitList);
} catch (Throwable e) {
e.printStackTrace();
isError.set(true);
} finally {
threadLatchs.countDown();
}
} catch (Exception e) {
e.printStackTrace();
isError.set(true);
}
});
}
// 倒计锁设置超时时间 30s
boolean await = threadLatchs.await(300, TimeUnit.SECONDS);
// 判断是否超时
if (!await) {
isError.set(true);
log.error("等待子线程执行已经超时!");
}
} catch (Throwable e) {
e.printStackTrace();
isError.set(true);
}
if (CollUtil.isNotEmpty(transactionStatuses)) {
if (isError.get()) {
transactionStatuses.forEach(status -> {
if (!status.isCompleted()) {
dataSourceTransactionManager.rollback(status);
}
});
} else {
transactionStatuses.forEach(status -> {
if (!status.isCompleted()) {
dataSourceTransactionManager.commit(status);
}
});
}
}
System.out.println("主线程完成!");
}
@Override
@Transactional(rollbackFor = {Exception.class})
public void saveSysUserAddressByTransaMan(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<SysUserAddress> sysUserAddressList) {
if (CollUtil.isEmpty(sysUserAddressList)) {
return;
}
//将事务状态都放在同一个事务里面
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(Propagation.REQUIRES_NEW.value()); // 事物隔离级别,每个线程都开启新事务,会比较安全一些
TransactionStatus transactionStatus = transactionManager.getTransaction(def); // 获得事务状态
transactionStatuses.add(transactionStatus);
sysUserAddressList.forEach(obj -> {
// if (StrUtil.equals(obj.getProvince(), "上海市2")) {
// //模拟子线程中保存出现异常
// int i = 1 / 0;
// }
synchronized (obj) {
save(obj);
}
});
System.out.println("子线程:" + Thread.currentThread().getName());
}
运行结果:
成功写入10000条数据。插入数据过程中模拟出现异常,事务会全部回滚
备注:
【1】如果处理数据过程中报错,请参考【2】方式解决:
【2】
https://blog.csdn.net/hkl_Forever/article/details/129117265
二、基于两个 CountDownLatch(倒计锁) 控制多线程事务
1、代码案例
//查询总数据
List<SysUserAddress> sysUserAddresses = ListUtil.toList();
for (int i = 0; i < 10000; i++) {
sysUserAddresses.add(new SysUserAddress(null, "上海市" + (i + 1), "上海市", "浦东新区"));
}
//线程数,按线程数拆分,默认3个线程
int threadCount = 3;
//按线程数平均分配后,每个线程处理的数据量
int perThreadData = sysUserAddresses.size() / threadCount;
//按线程数平均分配后,多余的数据量
int remainderCount = sysUserAddresses.size() % threadCount;
//有多余的数据,再开个线程处理
boolean havingRemainder = remainderCount > 0;
if (havingRemainder) {
threadCount += 1;
}
//用统计计算子线程提交数量
CountDownLatch threadLatchs = new CountDownLatch(threadCount);
//用于判断主线程是否提交,默认1
CountDownLatch mainLatch = new CountDownLatch(1);
//用于判断子线程任务是否有错误
AtomicBoolean isError = new AtomicBoolean(false);
for (int i = 0; i < threadCount; i++) {
//处理平均分配的数据量,多余的数据放在最后一个线程中处理
List<SysUserAddress> splitList = sysUserAddresses.stream()
.skip((long) i * perThreadData)
.limit((i == threadCount - 1) ? havingRemainder ? remainderCount : perThreadData : perThreadData)
.collect(Collectors.toList());
System.out.println("splitList = " + splitList);
//此时可以开启多线程处理数据,提高并发处理效率
executorService.execute(() -> {
//保存测试数据信息
this.saveSysUserAddress(splitList, threadLatchs, mainLatch, isError);
});
}
try {
//等待被子线程唤醒,等待时间 30s
boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
if (!await) {
//等待超时,标记为有异常
isError.set(true);
}
} catch (Throwable e) {
e.printStackTrace();
isError.set(true);
}
mainLatch.countDown(); //主线程倒计数,用于唤醒主线程
System.out.println("主线程执行完成!");
private void saveSysUserAddress(List<SysUserAddress> splitList, CountDownLatch threadLatchs, CountDownLatch mainLatch, AtomicBoolean isError) {
System.out.println("子线程" + Thread.currentThread().getName() + "开始执行...");
//当前事务状态
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
if (CollUtil.isEmpty(splitList)) {
threadLatchs.countDown(); //子线程倒计数,用于唤醒子线程
return;
}
splitList.forEach(obj -> {
try {
if (StrUtil.equals(obj.getProvince(), "上海市6")) {
//模拟子线程中此条数据出现异常
int i = 10 / 0;
}
sysUserAddressService.save(obj);
} catch (Exception e) {
isError.set(true);
e.printStackTrace();
} finally {
threadLatchs.countDown();
}
});
try {
mainLatch.await(); //阻塞等待被主线程唤醒
} catch (InterruptedException e) {
isError.set(true);
e.printStackTrace();
}
//主线程被唤醒,判断线程中是否有异常
if (isError.get()) {
dataSourceTransactionManager.rollback(transactionStatus);
} else {
dataSourceTransactionManager.commit(transactionStatus);
}
}
说明:
此方式没有第一种方式可靠,推荐第一种方式
版权声明:本文为hkl_Forever原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。