java多线程—统计汇总的问题
文章目录
1. 问题描述
在现实开发中,为了考虑性能问题,我们可以把大量的数据拆分成不同的
Batch
进行批处理,这边使用到了
ExecutorService.submit()
的线程池处理方法。那么问题也来了,到底怎么对数据进行汇总可以满足
线程安全
且可以
优雅的获得全局数据
?
下面简单的给出一段代码,当然这个代码肯定是有问题的。
由于代码版权问题,这里对代码逻辑进行了简化。
/**
* count profile num.
* <p>
* The number of profile is generated by a random int;
* Not from true project
* </p>
*
* @param profileNum int
*/
private void count(int profileNum) {
int tmp = 0;
tmp += profileNum;
trueCount.getAndAdd(profileNum);
count += tmp;
}
/**
* run without any thread safe op.
*
* @throws InterruptedException exception
*/
private void runNaked() throws InterruptedException {
Random rand = new Random(1000);
Runnable runnable = () -> {
int profileCount = rand.nextInt(10);
count(profileCount);
};
for (int i = 0; i < 1000; i++) {
executor.submit(runnable);
}
System.out.println("this is end: count = " + count + " true count: " + trueCount);
System.out.println("result:" + (trueCount.toString().equals(count.toString())));
}
/*
OutPut :
this is end: count = 4419 true count: 4478
result:false
*/
如果
runnable
的内部处理过程时间长,
result count
会出现0的情况!
注意
:这边有两个地方需要解决:1)任务总数的正确,2)count的线程安全!
2. 问题解决方式
2.0 瑕疵方法
我一开始考虑到既然任务没有完成,那么总数OK就OK了。显然没有考虑第二个因素。那么就产生了如下的
垃圾
代码。
/**
* run with threadPool shutdown().
*
* @throws InterruptedException exception
*/
private void runUnsafe() throws InterruptedException {
Random rand = new Random(1000);
Runnable runnable = () -> {
int profileCount = rand.nextInt(10);
count(profileCount);
};
for (int i = 0; i < 1000; i++) {
executor.submit(runnable);
}
// executor shutdown 使得这个线程池无法再用。
executor.shutdown();
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("waiting ... ");
}
System.out.println("this is end: count = " + count + " true count: " + trueCount);
System.out.println("result:" + (trueCount.toString().equals(count.toString())));
}
/*
this is end: count = 4463 true count: 4478
result:false
*/
很显然,false的很明显!
虽然使用了
shutdown()
和
awaitTermination
的组合拳,但是也随之产生了两个问题。
-
对于
Springboot
的
AutoWired
来说,
bean
默认是单例模式,那么把线程池关掉会不会导致其他地方使用出问题! -
count()
这个并非线程安全。
2.1 使用synchronized修饰count
第一个可行的方案就是在2.0的基础上对
count
进行修饰,这边定义为
count2
/**
* Add profile num to total count.
* <p>
* synchronized method
* </p>
*
* @param profileNum int
*/
private synchronized void count2(int profileNum) {
int tmp = 0;
tmp += profileNum;
trueCount.getAndAdd(profileNum);
count += tmp;
}
/**
* use synchronized method.
*
* @throws InterruptedException Exception
*/
private void runSafeWithSych() throws InterruptedException {
Random rand = new Random(1000);
Runnable runnable = () -> {
int profileCount = rand.nextInt(10);
count2(profileCount);
};
for (int i = 0; i < 1000; i++) {
executor.submit(runnable);
}
// executor shutdown 使得这个线程池无法再用。
executor.shutdown();
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("waiting ... ");
}
System.out.println("this is end: count = " + count + " true count: " + trueCount);
System.out.println("result:" + (trueCount.toString().equals(count.toString())));
}
/*
this is end: count = 4478 true count: 4478
result:true
*/
这样总算正确了,是不是代表就ok了??
细想一下,我们能不能不对
count()
进行修饰,好像加个
ReentrantLock
就很ok了,那验证一下。
2.2 ReentrantLock版本
/**
* use ReentrantLock.
* <p>
* 可以暂时解决线程不安全的问题,但是关闭了线程池
* </p>
*
* @throws InterruptedException Exception
*/
private void runSafe1WithReentrantLock() throws InterruptedException {
Random rand = new Random(1000);
Runnable runnable = () -> {
lock.lock();
int profileCount = rand.nextInt(10);
count(profileCount);
lock.unlock();
};
for (int i = 0; i < 1000; i++) {
executor.submit(runnable);
}
executor.shutdown();
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("waiting ... ");
}
System.out.println("this is end: count = " + count + " true count: " + trueCount);
System.out.println("result:" + (trueCount.toString().equals(count.toString())));
executor.shutdownNow();
}
/*
this is end: count = 4478 true count: 4478
result:true
*/
这样基本差不多了,可是
shutdown()
的问题没有解决。
下面使用
CountDownLatch
解决这个问题。
2.3 加上CountDownLatch
/**
* ReentrantLockANDCountDownLatch + CountDownLatch.
*
* <p>
* 使用CountDownLatch来进行一个任务的同步统计,但是对于一种不知道数量大小的任务来说,这个很难实现。
* </p>
*
* @throws InterruptedException exception
*/
private void runSafe1WithReentrantLockANDCountDownLatch() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1000);
Random rand = new Random(1000);
Runnable runnable = () -> {
try {
while (!isAllSubmit) {
TimeUnit.SECONDS.sleep(1);
}
lock.lock();
int profileCount = rand.nextInt(10);
count(profileCount);
lock.unlock();
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
countDownLatch.countDown();
}
};
for (int i = 0; i < 1000; i++) {
executor.submit(runnable);
}
isAllSubmit = true;
countDownLatch.await();
System.out.println("this is end: count = " + count + " true count: " + trueCount);
System.out.println("result:" + (trueCount.toString().equals(count.toString())));
executor.shutdownNow();
}
/*
this is end: count = 4478 true count: 4478
result:true
*/
但是这个问题也有个
局限
:如果对于批处理的总任务数不清楚的话,这种方式就不那么方便。
这边就考虑了使用
List<Future>
来接收
submit()
的返回值,使用
Future::get()
来解决。
2.4 Future方法
/**
* ReentrantLock + Future.
* <p>
* 使用Future join的特性来解决
* </p>
*
* @throws InterruptedException exception
*/
private void runSafe1WithReentrantLockAndFuture() throws InterruptedException {
Random rand = new Random(1000);
Runnable runnable = () -> {
try {
lock.lock();
int profileCount = rand.nextInt(10);
count(profileCount);
lock.unlock();
} catch (Exception e) {
System.out.println(e.getMessage());
}
};
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
futures.add(executor.submit(runnable));
}
for (Future future :
futures) {
try {
future.get();
} catch (ExecutionException e) {
System.out.println(e.getMessage());
}
}
System.out.println("this is end: count = " + count + " true count: " + trueCount);
System.out.println("result:" + (trueCount.toString().equals(count.toString())));
executor.shutdownNow();
}
/*
this is end: count = 4478 true count: 4478
result:true
*/
3. 总结
这是对工作遭遇到的问题进行解决的思路。