SpringBoot项目中控制线程池、多线程事务提交、回滚的方式

  • Post author:
  • Post category:其他



场景:


1、由于多线程每个线程都是一个异步任务,所以每个线程都是一个单独的事务,通常使用的声明式事务 @Transactional() 是无法控制多线程中事务的

2、所以只能另寻解决方式


解决:


一、基于TransactionStatus集合来控制多线程事务提交(推荐此方式)

1、代码案例

    @Autowired
    private DataSourceTransactionManager dataSourceTransactionManager;
    List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<>());




    @Override
    @Transactional(rollbackFor = {Exception.class})
    public void testMultiThreadTransactional() throws BizException {

        //模拟总数据
        List<SysUserAddress> sysUserAddresses = ListUtil.toList();
        for (int i = 0; i < 10000; i++) {
            sysUserAddresses.add(new SysUserAddress(null, "上海市" + (i + 1), "上海市", "浦东新区"));
        }

        //线程数,按线程数拆分,默认3个线程
        int threadCount = 3;
        //按线程数平均分配后,每个线程处理的数据量
        int perThreadData = sysUserAddresses.size() / threadCount;
        //按线程数平均分配后,多余的数据量
        int remainderCount = sysUserAddresses.size() % threadCount;
        //有多余的数据,再开个线程处理
        boolean havingRemainder = remainderCount > 0;
        if (havingRemainder) {
            threadCount += 1;
        }

        //子线程倒计锁
        CountDownLatch threadLatchs = new CountDownLatch(threadCount);
        //子线程中是否有异常标识
        AtomicBoolean isError = new AtomicBoolean(false);

        try {
            for (int i = 0; i < threadCount; i++) {
                //设置每个线程处理的数据量,多余的数据放在最后一个线程中处理
                List<SysUserAddress> splitList = sysUserAddresses.stream()
                        .skip((long) i * perThreadData)
                        .limit((i == threadCount - 1) ? (havingRemainder ? remainderCount : perThreadData) : perThreadData)
                        .collect(Collectors.toList());

                //开启多线程
                executorService.execute(() -> {
                    try {
                        try {
                            this.sysUserAddressService.saveSysUserAddressByTransaMan(dataSourceTransactionManager, transactionStatuses, splitList);
                        } catch (Throwable e) {
                            e.printStackTrace();
                            isError.set(true);
                        } finally {
                            threadLatchs.countDown();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        isError.set(true);
                    }
                });
            }

            // 倒计锁设置超时时间 30s
            boolean await = threadLatchs.await(300, TimeUnit.SECONDS);
            // 判断是否超时
            if (!await) {
                isError.set(true);
                log.error("等待子线程执行已经超时!");
            }
        } catch (Throwable e) {
            e.printStackTrace();
            isError.set(true);
        }

        if (CollUtil.isNotEmpty(transactionStatuses)) {
            if (isError.get()) {
                transactionStatuses.forEach(status -> {
                    if (!status.isCompleted()) {
                        dataSourceTransactionManager.rollback(status);
                    }
                });
            } else {
                transactionStatuses.forEach(status -> {
                    if (!status.isCompleted()) {
                        dataSourceTransactionManager.commit(status);
                    }
                });
            }
        }

        System.out.println("主线程完成!");

    }








    @Override
    @Transactional(rollbackFor = {Exception.class})
    public void saveSysUserAddressByTransaMan(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<SysUserAddress> sysUserAddressList) {
        if (CollUtil.isEmpty(sysUserAddressList)) {
            return;
        }

        //将事务状态都放在同一个事务里面
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(Propagation.REQUIRES_NEW.value());                   // 事物隔离级别,每个线程都开启新事务,会比较安全一些
        TransactionStatus transactionStatus = transactionManager.getTransaction(def);   // 获得事务状态
        transactionStatuses.add(transactionStatus);

        sysUserAddressList.forEach(obj -> {
//            if (StrUtil.equals(obj.getProvince(), "上海市2")) {
//                //模拟子线程中保存出现异常
//                int i = 1 / 0;
//            }
            synchronized (obj) {
                save(obj);
            }
        });
        System.out.println("子线程:" + Thread.currentThread().getName());
    }

运行结果:

成功写入10000条数据。插入数据过程中模拟出现异常,事务会全部回滚

备注:

【1】如果处理数据过程中报错,请参考【2】方式解决:

【2】

https://blog.csdn.net/hkl_Forever/article/details/129117265


二、基于两个 CountDownLatch(倒计锁) 控制多线程事务

1、代码案例

        //查询总数据
        List<SysUserAddress> sysUserAddresses = ListUtil.toList();
        for (int i = 0; i < 10000; i++) {
            sysUserAddresses.add(new SysUserAddress(null, "上海市" + (i + 1), "上海市", "浦东新区"));
        }

        //线程数,按线程数拆分,默认3个线程
        int threadCount = 3;
        //按线程数平均分配后,每个线程处理的数据量
        int perThreadData = sysUserAddresses.size() / threadCount;
        //按线程数平均分配后,多余的数据量
        int remainderCount = sysUserAddresses.size() % threadCount;
        //有多余的数据,再开个线程处理
        boolean havingRemainder = remainderCount > 0;
        if (havingRemainder) {
            threadCount += 1;
        }

        //用统计计算子线程提交数量
        CountDownLatch threadLatchs = new CountDownLatch(threadCount);
        //用于判断主线程是否提交,默认1
        CountDownLatch mainLatch = new CountDownLatch(1);
        //用于判断子线程任务是否有错误
        AtomicBoolean isError = new AtomicBoolean(false);

        for (int i = 0; i < threadCount; i++) {
            //处理平均分配的数据量,多余的数据放在最后一个线程中处理
            List<SysUserAddress> splitList = sysUserAddresses.stream()
                    .skip((long) i * perThreadData)
                    .limit((i == threadCount - 1) ? havingRemainder ? remainderCount : perThreadData : perThreadData)
                    .collect(Collectors.toList());
            System.out.println("splitList = " + splitList);

            //此时可以开启多线程处理数据,提高并发处理效率
            executorService.execute(() -> {
                //保存测试数据信息
                this.saveSysUserAddress(splitList, threadLatchs, mainLatch, isError);
            });
        }

        try {
            //等待被子线程唤醒,等待时间 30s
            boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
            if (!await) {
                //等待超时,标记为有异常
                isError.set(true);
            }
        } catch (Throwable e) {
            e.printStackTrace();
            isError.set(true);
        }
        mainLatch.countDown(); //主线程倒计数,用于唤醒主线程

        System.out.println("主线程执行完成!");






    private void saveSysUserAddress(List<SysUserAddress> splitList, CountDownLatch threadLatchs, CountDownLatch mainLatch, AtomicBoolean isError) {
        System.out.println("子线程" + Thread.currentThread().getName() + "开始执行...");

        //当前事务状态
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);

        if (CollUtil.isEmpty(splitList)) {
            threadLatchs.countDown();   //子线程倒计数,用于唤醒子线程
            return;
        }
        splitList.forEach(obj -> {
            try {
                if (StrUtil.equals(obj.getProvince(), "上海市6")) {
                    //模拟子线程中此条数据出现异常
                    int i = 10 / 0;
                }
                sysUserAddressService.save(obj);
            } catch (Exception e) {
                isError.set(true);
                e.printStackTrace();
            } finally {
                threadLatchs.countDown();
            }
        });

        try {
            mainLatch.await();  //阻塞等待被主线程唤醒
        } catch (InterruptedException e) {
            isError.set(true);
            e.printStackTrace();
        }

        //主线程被唤醒,判断线程中是否有异常
        if (isError.get()) {
            dataSourceTransactionManager.rollback(transactionStatus);
        } else {
            dataSourceTransactionManager.commit(transactionStatus);
        }
    }

说明:

此方式没有第一种方式可靠,推荐第一种方式



版权声明:本文为hkl_Forever原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。