Java8–Stream 并行流详解

  • Post author:
  • Post category:java




简介

并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。串行流则相反,并行流的底层其实就是ForkJoin框架的一个实现。


java.util.Collection < E >

新添加了两个默认方法

  • default Stream stream() : 返回串行流
  • default Stream parallelStream() : 返回并行流

将一个并行流转成顺序的流只要调用sequential()方法

stream.parallel() .filter(…) .sequential() .map(…) .parallel() .reduce();

这两个方法可以多次调用, 只有最后一个调用决定这个流是顺序的还是并发的。

想要明白并行流,那么就必须了解ForkJion框架



Fork/Join框架介绍

Fork/Join框架时java7提供的一个用于并行执行任务的框架:就是在必要的情况下,将一个大任务,进行拆分成若干个小任务(拆到不可再拆时),再将一个个小的任务运算的结果进行jion汇总。

在这里插入图片描述

注意:Fork/Jion框架使用的默认线程数等于你机器的处理器核心数

通过这个方法可以修改这个值,而且这个还是全局属性,不过建议一般不修改

System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “12”);



工作窃取模式

Fork/Join框架它所使用的线程模式—-

工作窃取模式

。每个线程都会为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于种种原因,某个线程可能很早就完成了分配给它的任务,而其他的线程还未完成,那么这个线程就会,随机选一个线程,从队列的尾巴上“偷走一个任务”。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。

在这里插入图片描述



使用Fork/Join框架

  1. 首先定义一个类,去继承

    RecursiveTask

    或者

    RecursiveAction
public class ForkJionCalculate extends RecursiveTask<Long> implements Serializable {
    private static final long serialVersionUID = 2462375556031755900L;
    
    @Override
    protected Long compute() {
        return null;
    }
}
-------------------------------------------------
public class ForkJionCalculate extends RecursiveAction implements Serializable {
    private static final long serialVersionUID = 2462375556031755900L;


    @Override
    protected void compute() {
        
    }
}

从上面的代码中我们可以看出

RecursiveTask



RecursiveAction

都有一个抽象方法

compute()

,只是

RecursiveTask

有返回值,

RecursiveAction

没有返回值,类时于Runnable和Callable

下面以

RecursiveTask

为例子

public class ForkJionCalculate extends RecursiveTask<Long> implements Serializable {
    private static final long serialVersionUID = 2462375556031755900L;


    private long start;

    private long end;

    private static final long THRESHOLD = 10000L;//临界值

    public ForkJionCalculate(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long length = end -start;
        if(length<=THRESHOLD){
            long sum =0;
            for (long i = start; i <=end ; i++) {
                sum+=i;
            }
            return sum;
        }else {
            long middle =(start+end)/2;
            ForkJionCalculate left = new ForkJionCalculate(start, middle);
            left.fork();//拆分,并将该子任务压入线程队列
            ForkJionCalculate right = new ForkJionCalculate(middle + 1, end);
            right.fork();
            return left.join()+right.join();
        }
    }
}

接下来测试一下,要执行一个ForkJoin的任务,首先建一个线程池ForkJoinPool,这个跟我们的普通的线程池使用上很像,因为它们的祖先都是ExecutorService

 @org.junit.Test
    public void test(){
        Instant start = Instant.now();

        ForkJoinPool pool = new ForkJoinPool();
        ForkJionCalculate task = new ForkJionCalculate(0, 10000000000L);
        Long sum = pool.invoke(task);
        System.out.println(sum);
        Instant end = Instant.now();
        System.out.println("耗费时间"+Duration.between(start,end).toMillis());//2046
    }

然后,我们用传统的循环来比较

@org.junit.Test
    public void test1(){
        Instant start = Instant.now();
        long sum = 0L;
        for (long i = 0L; i < 10000000000L; i++) {
            sum+=i;
        }
        System.out.println(sum);
        Instant end = Instant.now();
        System.out.println("耗费时间"+Duration.between(start,end).toMillis());//3561
    }

可以看出ForkJoin的效率高很多

注意:使用ForkJoin时,任务的量一定要大,否则太小,由于任务拆分也会消耗时间,它执行的效率不一定比for循环高

在这里插入图片描述

最后我们用Stream的并行流来测试(内部实现的是ForkJoin)

@org.junit.Test
    public void test3(){
        Instant start = Instant.now();
        long sum = LongStream.rangeClosed(0, 10000000000L).parallel().sum();
        System.out.println(sum);
        Instant end = Instant.now();
        System.out.println("耗费时间"+Duration.between(start,end).toMillis());//1421
    }

可以看出也是比传统的要快

注意:上面代码上我们使用的是LongStream来生成数据

( rangeClosed:需要传入开始节点和结束节点两个参数,返回的是一个有序的LongStream。包含开始节点和结束节点两个参数之间所有的参数,间隔为1

range:同理

区别就是rangeClosed包含最后的结束节点,range不包含。)

接下来我们使用迭代的方式生成

 @org.junit.Test
    public void test4(){
        Instant start = Instant.now();
        Long reduce = Stream.iterate(1L, x -> x + 1).limit(10000000000L).parallel().reduce(0L, Long::sum);
        System.out.println(reduce);
        Instant end = Instant.now();
        System.out.println("耗费时间"+Duration.between(start,end).toMillis());
    }

直接报错 超出内存(我16G)

所以在使用并行流的时候需要注意:

  1. 留意装箱。自动装箱和拆箱操作会大大降低性能。Java8中有原始类型流(IntStream、LongStream、DoubleStream)来避免这种操作,但凡有可能都应该使用这些原始流。
  2. 有些操作本身在并行流上的性能就比顺序流差。特别是limit、findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性能好,因为它不一定要顺序来执行。你总是可以调用unordered方法来把顺序流变成无须流。那么,如果你需要流中的n个元素而不是专门的前n个的话,对无序并行流调用limit可能会比单个有序流(比如数据源是List)更高效。
  3. 还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定型估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
  4. 对于较小的数据量,选择并行流几乎从来不都是一个好的选择。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
  5. 要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高很多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。最后,你可以自己实现Spliterator来完全掌握分解过程。
  6. 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。
  7. 还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。

    下面的是我的公众号二维码图片,欢迎关注。

    在这里插入图片描述



版权声明:本文为YCJ_xiyang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。