并发编程(1)-java中的6中线程状态
并发编程(2)-怎么中断线程?
并发编程(3)-synchronized的实现原理
并发编程(4)-深入理解volatile关键字
并发编程(5)-ReentrantLock源码分析
并发编程(6)-Condition源码分析
并发编程(7)-juc阻塞队列介绍
并发编程(8)-什么是异步责任链
并发编程(9)-Semaphore介绍和源码分析
并发编程(10)-CyclicBarrier的使用及其源码分析
一 案例
public class ForkJoinExample {
//java8 parallStream
//针对一个数字,做计算。
private static final Integer MAX=200;
static class CalcForJoinTask extends RecursiveTask<Integer> {
private Integer startValue; //子任务的开始计算的值
private Integer endValue; //子任务结束计算的值
public CalcForJoinTask(Integer startValue, Integer endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
protected Integer compute() {
//如果当前的数据区间已经小于MAX了,那么接下来的计算不需要做拆分
if(endValue-startValue<MAX){
System.out.println("开始计算:startValue:"+startValue+" ; endValue:"+endValue);
Integer totalValue=0;
for(int i=this.startValue;i<=this.endValue;i++){
totalValue+=i;
}
return totalValue;
}
CalcForJoinTask subTask=new CalcForJoinTask(startValue,(startValue+endValue)/2);
subTask.fork();
CalcForJoinTask calcForJoinTask=new CalcForJoinTask((startValue+endValue)/2+1,endValue);
calcForJoinTask.fork();
return subTask.join()+calcForJoinTask.join();
}
}
public static void main(String[] args) {
CalcForJoinTask calcForJoinTask=new CalcForJoinTask(1,10000);
ForkJoinPool pool=new ForkJoinPool();
ForkJoinTask<Integer> taskFuture=pool.submit(calcForJoinTask);
try {
Integer result=taskFuture.get();
System.out.println("result:"+result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
二、流程图
为了更清晰的了解fork/join的原理,我们通过一个图形来理解。整体思想其实就是拆分与合并。
图中最顶层的任务使用submit方式被提交到Fork/Join框架中, Fork/Join把这个任务放入到某个线程 中运行,工作任务中的compute方法的代码开始对这个任务T1进行分析。如果当前任务需要累加的数 字范围过大(代码中设定的是大于200),则将这个计算任务拆分成两个子任务(T1.1和T1.2),每个 子任务各自负责计算一半的数据累加,请参见代码中的fork方法。如果当前子任务中需要累加的数字范 围足够小(小于等于200),就进行累加然后返回到上层任务中。
三、Fork/Join API代码分析
简单给大家解释一下Fork/Join的相关api,在刚刚的案例中,涉及到几个重要的API, ForkJoinTask,ForkJoinPool .
ForkJoinTask : 基本任务,使用fork、join框架必须创建的对象,提供fork,join操作,常用的三个子类
- RecursiveAction : 无结果返回的任务
- RecursiveTask : 有返回结果的任务
- CountedCompleter :无返回值任务,完成任务后可以触发回调。
ForkJoinTask提供了两个重要的方法:
- fork : 让task异步执行
- join : 让task同步执行,可以获取返回值
ForkJoinPool: 专门用来运行 ForkJoinTask 的线程池,(在实际使用中,也可以接收 Runnable/Callable 任务,但在真正运行时,也会把这些任务封装成 ForkJoinTask 类型的任务)
方法名
invoke(ForkJoinTask t)
:提交任务并一直阻塞直到任务执行完成返回合并结果。
execute(ForkJoinTask t)
:异步执行任务,无返回值。
submit(ForkJoinTask t)
:异步执行任务,返回task本身,可以通过task.get()方法获取合并之后的 结果。
ForkJoinTask 在不显示使用 ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可 以使用自己的fork/invoke方法进行执行