Java-ForkJoinPool详解

  • Post author:
  • Post category:java




ForkJoinPool介绍

今天慢慢猪和大家分享JDK1.7中出现的ForkJoinPool类。我们将从三个维度介绍ForkJoinPool。首先介绍ForkJoinPool出现的背景与思想,其次介绍ForkJoinPool使用方法以及性能比较,最后通过手写一个ForkJoinPool来展示其原理。



ForkJoinPool背景

  1. 想想如下场景,给定一个长度为10万的int型的数组,你需要计算出其中最大的值。按照最简单的方案便是通过依次遍历数组,获取最大的值。代码如下
static int maxArray(int[] arrays){
        int maxPos = 0;
        int maxValue = arrays[0];
        for (int i = 1; i < arrays.length; i++) {
            if (maxValue<arrays[i]){
                maxPos = i;
                maxValue = arrays[i];
            }
        }
        return maxPos;
    }

但是我们并没有充分发挥机器硬件的优势,因为现在的处理器大多数都是双核四核处理器,当我们用单线程去处理的时候,我们的另外三个处理器其实是空闲的。

2. 使用分而治之思想增加速度,我们的任务可以拆分成多个子任务,让每个CPU处理器并行处理子任务,最后将子任务合并放入。所以我们通过如下代码来实现更快的速度

static Task[] maxArray2(int[] arrays){
        //线程拆分成四个子任务
        CountDownLatch countDownLatch = new CountDownLatch(4);
        int pian = arrays.length/4;
        Task[] tasks = new Task[4];
        for (int i = 0; i < 4; i++) {
            tasks[i]= new Task(i,arrays,countDownLatch);
            new Thread(tasks[i]).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return tasks;
    }
  1. 显然我们优化代码的核心思想其实就是通过将任务分配成更小的子任务,进行计算。最后再将计算结果进行合并计算。这样我们能够使CPU的使用效率达到最高值。但是此时仍然还有一个问题就是JDK中并没有现成的能够使用多线程来分解子任务的框架模型。所以在JDK1.7中出现了ForkJoinPool框架,而Fork:是分叉,分拆的意思。Join是归并其思想可以通过如下图进行展示

    在这里插入图片描述



ForkJoinPool使用

  1. forkJoinPool使用方法,在上一节中我们介绍到利用多线程充分使用CPU的性能,但是我们增加了程序的复杂度,需要将任务分而治之,并且在最后进行归总。由于多线程的存在,我们需要使用countdownLatch来保证线程都执行完毕。因此JDK提供了一套规范的封装,而我们只需要实现怎么样将任务分割,至于任务启动,以及任务原子性报错都交给ForkJoinPool框架来实现
  2. ForkJoinPool类的使用,这个是一个线程池,可以移步执行我们提交的任务
 ForkJoinPool forkJoinPool = new ForkJoinPool();
 Integer invoke = forkJoinPool.invoke(myTask);
  1. RecursiveTask类的使用,RecursiveTask类有一个抽象方法compute计算数据。它就是JDK提供让使用者描述当前的任务需要如何切分(fork),以及如何归约(join)。因此compute可以分为三个方面。
  • 最小粒度任务如何执行,当任务切分为最小任务时候如何执行计算
  • 如何拆分任务,按照什么样的规则来交给多线程执行
  • 如何归约任务,如何将任务进行 归约
class MyTask extends RecursiveTask<Integer> {

    int  start;
    int  end;
    int[] targetArray;

    public MyTask(int start, int end, int[] targetArray) {
        this.start = start;
        this.end = end;
        this.targetArray = targetArray;
    }

    @Override
    protected Integer compute() {
        if (this.end -this.start<100000){
            return BackGroudSitutaion.maxArray(this.targetArray,this.start,this.end);
        }else{
            int newBegin = this.start +10000;
            MyTask newTask = new MyTask( newBegin, this.end,this.targetArray);
            MyTask threadTask = new MyTask(this.start, newBegin - 1, this.targetArray);
            //任务分叉
            threadTask.fork();
            Integer compute = newTask.compute();
            //任务收集
            Integer join = threadTask.join();
            if (targetArray[compute]>targetArray[join]){
                return compute;
            }else{
                return join;
            }
        }
    }
}

上述代码中可以看出if判断是用于判断任务在最小粒度中如何执行任务,fork任务代表分叉执行,最后的join之后的处理比较大小则表明归约执行。

  • 所以RecursiveTask有了解三个API:fork:分叉任务执行,join执行任务完毕得到分叉结果,compute执行任务
  1. 在本例子中优化结果并不明显,甚至不如顺序执行。这是因为JIT编译器优化,CPU执行单片任务速度比分配线程时间更小。
  2. 正确使用例子:在计算任务中有IO操作,我们可以将计算任务放到一个线程下,IO操作放在另一个任务线程下从而增加执行速度。



ForkJoinPool与ThreadPool思考

  1. 共同点
  • 它们都是线程池中有多个线程,能够将任务进行执行
  1. 区别
  • ForkJoinPool是将一个任务分配为多个子任务来进行并发执行。ThreadPool是将不同任务放入队列进行分配执行



版权声明:本文为u014698349原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。