java多线程下的事务控制–二阶段提交

  • Post author:
  • Post category:java




java多线程事务控制





前言

本文记录了spring环境中,基于多线程场景下的事务控制机制。



目标:

  1. 主线程等待子线程事务完整提交完毕后才执行提交,否则进行回滚。
  2. 每个子线程事务等待其他其他子线程执行结果,有异常,全部回滚。



主要逻辑:

子线程手动回滚,主线程自动回滚

  1. 手动控制每个子线程的事务状态,catch异常,有异常,则更新公共变量。
  2. 每个子线程等待其他子线程执行结果,都执行完后根据公共变量值判断执行rollback或commit操作。
  3. 所有子线程执行完后,主线程根据公共变量判断是否存在执行异常情况,有的话,主线程抛出异常,执行回滚操作。



一、利用CountDownLatch实现

CountDownLatch提供了灵活的方案,可在子线程完成好其他线程依赖的工作后调用countDown()方法主动减少计数,同时继续做线程间业务无依赖的其他工作。

代码如下(示例):


@Slf4j
@Service
public class TransactionService {

    @Resource
    private ExecutorService executorService;

    @Resource
    private PlatformTransactionManager platformTransactionManager;

    @Resource
    private TransactionDefinition transactionDefinition;


    @Transactional
    public void multithreadedTransaction(List<User> userList) {
    	// 主线程计数器,用于控制所有子线程执行完成后的执行逻辑
        CountDownLatch mainLatch = new CountDownLatch(1);
        // 子线程计数器,用于等待子线程的执行结束
        CountDownLatch childLatch = new CountDownLatch(userList.size());
        AtomicBoolean rollbackFlag = new AtomicBoolean(false);
        userList.forEach(entity -> executorService.execute(() -> {
        	// 获取当前事务状态对象
            final TransactionStatus transactionStatus = orderTransactionManager.getTransaction(transactionDefinition);
            try {
                // 这里处理业务逻辑
                // UserMapper.update(entity);
            } catch (Exception e) {
                log.warn("{}更新失败:", Thread.currentThread().getName(), e);
                rollbackFlag.set(true);
            }
            // 第1,处理业务逻辑结束,计数器-1,等待其他子线程
            childLatch.countDown();
            try {
                // 第4,主线程等待
                mainLatch.await();
                // 第5,主线程、子线程继续执行
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 第6,子线程回滚/提交
            if (rollbackFlag.get()) {
                orderTransactionManager.rollback(transactionStatus);
                return;
            }
            orderTransactionManager.commit(transactionStatus);
        }));

        try {
            // 第2,所有子线程业务逻辑处理完后,子线程继续执行,主线程也继续
            childLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 第3,主线程计数器-1。
            mainLatch.countDown();
        }
        // 第6,主线程回滚/提交
        if (rollbackFlag.get()) {
            log.warn("多线程执行失败,事务回滚");
            throw new RuntimeException();
        }
    }

}
    

逻辑如代码注释



二、利用CyclicBarrier实现


CyclicBarrier

是 Java 并发包中的一个同步工具类,用于协调多个线程在某个屏障处等待,直到所有线程都到达屏障后才能继续执行后续操作。CyclicBarrier 的主要特点是可以重复使用,即当所有等待线程都被释放后,CyclicBarrier 可以被重置并再次使用。

@Slf4j
@Service
public class TransactionService {

    @Resource
    private ExecutorService executorService;

    @Resource
    private PlatformTransactionManager platformTransactionManager;

    @Resource
    private TransactionDefinition transactionDefinition;

    /**
     * 利用CyclicBarrier实现多线程下的事务控制--二阶段提交
     * @param userList 待处理的用户列表
     */
    @Transactional
    public void multithreadedTransaction(List<User> userList) {
        // 第1步,创建CyclicBarrier
        CyclicBarrier cyclicBarrier = new CyclicBarrier(userList.size() + 1);
        // 创建提交标记
        AtomicBoolean commitFlag = new AtomicBoolean(true);
        // 创建TransactionStatus集合,用于记录每个线程的事务状态
        Map<Thread, TransactionStatus> transactionStatusMap = new ConcurrentHashMap<>();

        userList.forEach(entity -> executorService.execute(() -> {
            // 第2步,创建TransactionStatus对象
            TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
            try {
                // 第3步,执行业务逻辑
                // UserMapper.update(entity);
                log.info("{}更新成功", Thread.currentThread().getName());
            } catch (Exception e) {
                // 若执行过程中出现异常,则将提交标记置为false
                commitFlag.set(false);
                log.warn("{}更新失败:", Thread.currentThread().getName(), e);
            } finally {
                // 第4步,将当前线程的TransactionStatus对象存入集合中
                transactionStatusMap.put(Thread.currentThread(), transactionStatus);
                try {
                    // 第5步,等待其他线程执行完毕
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }));

        try {
            // 第6步,等待所有线程执行完毕
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

        // 第7步,根据提交标记决定提交或回滚
        if (commitFlag.get()) {
            transactionStatusMap.values().forEach(platformTransactionManager::commit);
        } else {
            transactionStatusMap.values().forEach(platformTransactionManager::rollback);
            throw new RuntimeException("多线程执行失败,事务回滚");
        }
    }
}



总结

虽然上面的多线程事务二阶段提交的实现方式能够完成事务的提交和回滚,但是其存在一些缺陷:

  1. 阻塞等待:在第二阶段的提交过程中,由于需要等待所有的子线程都执行完毕后才能进行提交或者回滚操作,所以在这个过程中主线程会被阻塞等待,可能会导致主线程的性能瓶颈。
  2. 事务性能低:由于在多线程环境下,每个子线程都需要独立开启一个事务来进行数据库的操作,而且还需要等待所有的子线程执行完毕后才能进行提交或回滚操作,这样会导致事务的性能低下,影响整个应用的性能表现。
  3. 无法保证数据一致性:在第二阶段提交时,虽然通过协调器的方式保证了所有子线程都成功提交或者回滚了,但是在网络、数据库等各种因素的干扰下,可能出现只有部分子线程成功提交,而其他子线程失败的情况,这样会导致数据的不一致性。



版权声明:本文为CS5686原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。