11.forkJoin基本使用

  • Post author:
  • Post category:其他



并发编程(1)-java中的6中线程状态



并发编程(2)-怎么中断线程?



并发编程(3)-synchronized的实现原理



并发编程(4)-深入理解volatile关键字



并发编程(5)-ReentrantLock源码分析



并发编程(6)-Condition源码分析



并发编程(7)-juc阻塞队列介绍



并发编程(8)-什么是异步责任链



并发编程(9)-Semaphore介绍和源码分析



并发编程(10)-CyclicBarrier的使用及其源码分析



一 案例

public class ForkJoinExample {

    //java8 parallStream

    //针对一个数字,做计算。
    private static final Integer MAX=200;

    static class CalcForJoinTask extends RecursiveTask<Integer> {
        private Integer startValue; //子任务的开始计算的值
        private Integer endValue; //子任务结束计算的值

        public CalcForJoinTask(Integer startValue, Integer endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
        }
        @Override
        protected Integer compute() {
            //如果当前的数据区间已经小于MAX了,那么接下来的计算不需要做拆分
            if(endValue-startValue<MAX){
                System.out.println("开始计算:startValue:"+startValue+" ; endValue:"+endValue);
                Integer totalValue=0;
                for(int i=this.startValue;i<=this.endValue;i++){
                    totalValue+=i;
                }
                return totalValue;
            }
            CalcForJoinTask subTask=new CalcForJoinTask(startValue,(startValue+endValue)/2);
            subTask.fork();
            CalcForJoinTask calcForJoinTask=new CalcForJoinTask((startValue+endValue)/2+1,endValue);
            calcForJoinTask.fork();
            return subTask.join()+calcForJoinTask.join();
        }
    }

    public static void main(String[] args) {
        CalcForJoinTask calcForJoinTask=new CalcForJoinTask(1,10000);
        ForkJoinPool pool=new ForkJoinPool();
        ForkJoinTask<Integer> taskFuture=pool.submit(calcForJoinTask);
        try {
            Integer result=taskFuture.get();
            System.out.println("result:"+result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}	



二、流程图

为了更清晰的了解fork/join的原理,我们通过一个图形来理解。整体思想其实就是拆分与合并。

图中最顶层的任务使用submit方式被提交到Fork/Join框架中, Fork/Join把这个任务放入到某个线程 中运行,工作任务中的compute方法的代码开始对这个任务T1进行分析。如果当前任务需要累加的数 字范围过大(代码中设定的是大于200),则将这个计算任务拆分成两个子任务(T1.1和T1.2),每个 子任务各自负责计算一半的数据累加,请参见代码中的fork方法。如果当前子任务中需要累加的数字范 围足够小(小于等于200),就进行累加然后返回到上层任务中。

在这里插入图片描述



三、Fork/Join API代码分析

简单给大家解释一下Fork/Join的相关api,在刚刚的案例中,涉及到几个重要的API, ForkJoinTask,ForkJoinPool .

ForkJoinTask : 基本任务,使用fork、join框架必须创建的对象,提供fork,join操作,常用的三个子类

  • RecursiveAction : 无结果返回的任务
  • RecursiveTask : 有返回结果的任务
  • CountedCompleter :无返回值任务,完成任务后可以触发回调。

ForkJoinTask提供了两个重要的方法:

  • fork : 让task异步执行
  • join : 让task同步执行,可以获取返回值

ForkJoinPool: 专门用来运行 ForkJoinTask 的线程池,(在实际使用中,也可以接收 Runnable/Callable 任务,但在真正运行时,也会把这些任务封装成 ForkJoinTask 类型的任务)

方法名


invoke(ForkJoinTask t)

:提交任务并一直阻塞直到任务执行完成返回合并结果。


execute(ForkJoinTask t)

:异步执行任务,无返回值。


submit(ForkJoinTask t)

:异步执行任务,返回task本身,可以通过task.get()方法获取合并之后的 结果。

ForkJoinTask 在不显示使用 ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可 以使用自己的fork/invoke方法进行执行



版权声明:本文为weixin_40980639原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。