parallelStream()使用及避坑

  • Post author:
  • Post category:其他


在我们经常写的业务代码中很多时候会出现遍历循环的情况,比如取集合数据、封装集合数据等等,Java 8引入了一种新的Stream API,其中包括一个名为parallelStream()的方法。

什么是parallelStream()?

parallelStream()是Java 8 Stream API中的一种方法,它允许在多个线程上并行执行流操作。这使得我们能够更有效地利用现代多核处理器的能力。

为什么使用parallelStream()?

使用parallelStream()的主要优点是可以极大地提高程序的性能。在处理大量数据时,可以将数据分成多个部分,并在不同的处理器核心上并行处理每个部分。这样可以减少处理时间,并使应用程序更快。

如何使用parallelStream()?

使用parallelStream()很简单。只需要将流对象转换为并行流,就可以让Java在多个线程上同时执行流操作。下面是一个简单的例子

正常forEach()输出结果相比不用多说,大家也能预测到,但是用parallelStream()之后,因为是并行处理,实际输出并不是按照顺序

根据输出结果可以看出,parallelStream()并行原理是使用了ForkJoinPool。所以要想深入的研究parallelStream之前,那么我们必须先了解ForkJoin框架和ForkJoinPool,有兴趣可以深入了解一下ForkJoin。

重复消费问题

在使用parallelStream()时,有一个常见的问题是重复消费。当在并行流中使用一些会改变流源的操作时(比如sorted()、distinct()等),如果这些操作在多个线程上并行执行,那么就有可能导致重复消费。这是因为并行流需要对数据进行分片,不同线程处理的分片可能会包含相同的元素,从而导致相同的元素被处理多次。

下面是一个示例代码,它演示了在使用parallelStream()时可能会遇到的重复消费问题:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);

List<Integer> result = numbers.parallelStream()
                                .map(x -> x * 2)
                                .sorted()
                                .distinct()
                                .collect(Collectors.toList());

System.out.println(result);

在上面的代码中,我们在并行流中对列表中的元素进行了映射、排序和去重。由于这些操作在多个线程上并行执行,因此可能会导致重复消费问题。具体来说,当不同的线程在处理数据分片时,可能会处理相同的元素,从而导致相同的元素被处理多次。这样就可能导致最终的结果包含重复的元素。

为了避免重复消费问题,可以使用以下方法之一:

1. 避免在流操作中修改流源。

在使用parallelStream()时,应该尽量避免在流操作中修改流源。如果必须修改流源,可以考虑使用线程安全的数据结构来避免竞争条件。

2. 使用无状态操作。

无状态操作是指操作不依赖于之前的操作结果,也不会修改状态的操作。例如,map()、filter()等操作就是无状态操作。使用无状态操作可以避免重复消费问题,因为它们不需要保存任何状态或上下文。



版权声明:本文为freeStyle_liu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。