java多线程—Batch多线程统计问题

  • Post author:
  • Post category:java




java多线程—统计汇总的问题



1. 问题描述

在现实开发中,为了考虑性能问题,我们可以把大量的数据拆分成不同的

Batch

进行批处理,这边使用到了

ExecutorService.submit()

的线程池处理方法。那么问题也来了,到底怎么对数据进行汇总可以满足

线程安全

且可以

优雅的获得全局数据

下面简单的给出一段代码,当然这个代码肯定是有问题的。

由于代码版权问题,这里对代码逻辑进行了简化。

   /**
     * count profile num.
     * <p>
     *     The number of profile is generated by a random int;
     *     Not from true project
     * </p>
     *
     * @param profileNum int
     */
    private void count(int profileNum) {
        int tmp = 0;
        tmp += profileNum;
        trueCount.getAndAdd(profileNum);
        count += tmp;
    }

	/**
     * run without any thread safe op.
     *
     * @throws InterruptedException exception
     */
    private void runNaked() throws InterruptedException {
        Random rand = new Random(1000);
        Runnable runnable = () -> {
            int profileCount = rand.nextInt(10);
            count(profileCount);
        };
        for (int i = 0; i < 1000; i++) {
            executor.submit(runnable);
        }
        System.out.println("this is end: count = " + count + " true count: " + trueCount);
        System.out.println("result:" + (trueCount.toString().equals(count.toString())));
    }

/*
	OutPut : 
	this is end: count = 4419 true count: 4478
  result:false
*/

如果

runnable

的内部处理过程时间长,

result count

会出现0的情况!


注意

:这边有两个地方需要解决:1)任务总数的正确,2)count的线程安全!



2. 问题解决方式



2.0 瑕疵方法

我一开始考虑到既然任务没有完成,那么总数OK就OK了。显然没有考虑第二个因素。那么就产生了如下的

垃圾

代码。

/**
     * run with threadPool shutdown().
     *
     * @throws InterruptedException exception
     */
    private void runUnsafe() throws InterruptedException {
        Random rand = new Random(1000);
        Runnable runnable = () -> {
            int profileCount = rand.nextInt(10);
            count(profileCount);
        };
        for (int i = 0; i < 1000; i++) {
            executor.submit(runnable);
        }
        // executor shutdown 使得这个线程池无法再用。
        executor.shutdown();
        while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
            System.out.println("waiting ... ");
        }
        System.out.println("this is end: count = " + count + " true count: " + trueCount);
        System.out.println("result:" + (trueCount.toString().equals(count.toString())));
    }

/*
  this is end: count = 4463 true count: 4478
  result:false
*/

很显然,false的很明显!

虽然使用了

shutdown()



awaitTermination

的组合拳,但是也随之产生了两个问题。

  • 对于

    Springboot



    AutoWired

    来说,

    bean

    默认是单例模式,那么把线程池关掉会不会导致其他地方使用出问题!

  • count()

    这个并非线程安全。



2.1 使用synchronized修饰count

第一个可行的方案就是在2.0的基础上对

count

进行修饰,这边定义为

count2

 /**
     * Add profile num to total count.
     * <p>
     *     synchronized method
     * </p>
     *
     * @param profileNum int
     */
    private synchronized void count2(int profileNum) {
        int tmp = 0;
        tmp += profileNum;
        trueCount.getAndAdd(profileNum);
        count += tmp;
    }
 /**
     * use synchronized method.
     *
     * @throws InterruptedException Exception
     */
    private void runSafeWithSych() throws InterruptedException {
        Random rand = new Random(1000);
        Runnable runnable = () -> {
            int profileCount = rand.nextInt(10);
            count2(profileCount);
        };
        for (int i = 0; i < 1000; i++) {
            executor.submit(runnable);
        }
        // executor shutdown 使得这个线程池无法再用。
        executor.shutdown();
        while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
            System.out.println("waiting ... ");
        }
        System.out.println("this is end: count = " + count + " true count: " + trueCount);
        System.out.println("result:" + (trueCount.toString().equals(count.toString())));
    }

/*
 this is end: count = 4478 true count: 4478
 result:true
*/

这样总算正确了,是不是代表就ok了??

细想一下,我们能不能不对

count()

进行修饰,好像加个

ReentrantLock

就很ok了,那验证一下。



2.2 ReentrantLock版本

/**
     * use ReentrantLock.
     * <p>
     *     可以暂时解决线程不安全的问题,但是关闭了线程池
     * </p>
     *
     * @throws InterruptedException Exception
     */
    private void runSafe1WithReentrantLock() throws InterruptedException {
        Random rand = new Random(1000);
        Runnable runnable = () -> {
            lock.lock();
            int profileCount = rand.nextInt(10);
            count(profileCount);
            lock.unlock();
        };
        for (int i = 0; i < 1000; i++) {
            executor.submit(runnable);
        }
        executor.shutdown();
        while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
            System.out.println("waiting ... ");
        }
        System.out.println("this is end: count = " + count + " true count: " + trueCount);
        System.out.println("result:" + (trueCount.toString().equals(count.toString())));
        executor.shutdownNow();
    }

/*
this is end: count = 4478 true count: 4478
result:true
*/

这样基本差不多了,可是

shutdown()

的问题没有解决。

下面使用

CountDownLatch

解决这个问题。



2.3 加上CountDownLatch

/**
     * ReentrantLockANDCountDownLatch + CountDownLatch.
     *
     * <p>
     *     使用CountDownLatch来进行一个任务的同步统计,但是对于一种不知道数量大小的任务来说,这个很难实现。
     * </p>
     *
     * @throws InterruptedException exception
     */
    private void runSafe1WithReentrantLockANDCountDownLatch() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1000);
        Random rand = new Random(1000);
        Runnable runnable = () -> {
            try {
                while (!isAllSubmit) {
                    TimeUnit.SECONDS.sleep(1);
                }
                lock.lock();
                int profileCount = rand.nextInt(10);
                count(profileCount);
                lock.unlock();
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                countDownLatch.countDown();
            }
        };
        for (int i = 0; i < 1000; i++) {
            executor.submit(runnable);
        }
        isAllSubmit = true;
        countDownLatch.await();
        System.out.println("this is end: count = " + count + " true count: " + trueCount);
        System.out.println("result:" + (trueCount.toString().equals(count.toString())));
        executor.shutdownNow();
    }
/*
	this is end: count = 4478 true count: 4478
	result:true
*/

但是这个问题也有个

局限

:如果对于批处理的总任务数不清楚的话,这种方式就不那么方便。

这边就考虑了使用

List<Future>

来接收

submit()

的返回值,使用

Future::get()

来解决。



2.4 Future方法

/**
     * ReentrantLock + Future.
     * <p>
     *     使用Future join的特性来解决
     * </p>
     *
     * @throws InterruptedException exception
     */
    private void runSafe1WithReentrantLockAndFuture() throws InterruptedException {
        Random rand = new Random(1000);
        Runnable runnable = () -> {
            try {
                lock.lock();
                int profileCount = rand.nextInt(10);
                count(profileCount);
                lock.unlock();
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
        };
        List<Future> futures = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            futures.add(executor.submit(runnable));
        }
        for (Future future :
                futures) {
            try {
                future.get();
            } catch (ExecutionException e) {
                System.out.println(e.getMessage());
            }
        }
        System.out.println("this is end: count = " + count + " true count: " + trueCount);
        System.out.println("result:" + (trueCount.toString().equals(count.toString())));
        executor.shutdownNow();
    }
/*
	this is end: count = 4478 true count: 4478
	result:true
*/



3. 总结

这是对工作遭遇到的问题进行解决的思路。



版权声明:本文为weixin_44078008原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。