举例1
2
3
4
5
6
7List list = new ArrayList(1);
list.add(1);
list.add(2);
list.stream().forEach(
System.out::println
);
List.stream方法1
2
3
4
5
6
7
8
9
10
11
12
13
14default Stream stream(){
return StreamSupport.stream(spliterator(), false);
}
@Override
default Spliterator spliterator(){
return Spliterators.spliterator(this, 0);
}
public static Spliterator spliterator(Collection extends T> c,
int characteristics){
return new IteratorSpliterator<>(Objects.requireNonNull(c),
characteristics);
}
List的stream()接口位于java.util.Collection类中;默认实现输入的参数1是拆分方法spliterator,2是并行默认false。
spliterator()接口也在该类中,默认实现调用final Spliterators类的spliterator方法,返回IteratorSpliterator。
而static IteratorSpliterator类实现了Spliterator。
Spliterator提供了tryAdvance处理每个元素、forEachRemaining、trySplit分割拆分等方法。
也就是说,stream()实际是采用Spliterator对于元素进行遍历、拆分处理。
StreamSupport.stream1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public static Stream stream(Spliterator spliterator, boolean parallel){
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
Head(Spliterator> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
ReferencePipeline(Spliterator> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
list.stream()方法最终会实例化ReferencePipeline.Head<>对象,Head为pipeline流的头结。
Head extends ReferencePipeline,E_IN为上游输入类型,E_OUT为输出类型点。
StreamOpFlag.fromCharacteristics(spliterator),将spliterator字符集转换为带有排序的流标记。
ReferencePipeline为继承了AbstractPipeline得抽象类,提供pipeline处理类型的各阶段基类。
AbstractPipeline1
2
3
4
5
6
7
8
9
10
11
12AbstractPipeline(Spliterator> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
AbstractPipeline为pipeline抽象基类,定义包括前一个、当前、下一个等AbstractPipeline处理流程等。
AbstractPipeline继承abstract class PipelineHelper,PipelineHelper定义了流的操作、输出、标记和并行等参数。
forEach1void forEach(Consumer super T> action);
对流中的每个元素执行操作。
1
2
3
4
5
6
7
8
9@Override
public void forEach(Consumer super E_OUT> action){
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEach(action);
}
}
forEach具体实现位于ReferencePipeline中,执行串行遍历或并行分割处理。
forEach并行1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26@Override
public void forEach(Consumer super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
final R evaluate(TerminalOp terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
default R evaluateParallel(PipelineHelper helper,
Spliterator spliterator) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), “{0} triggering TerminalOp.evaluateParallel serial default”);
return evaluateSequential(helper, spliterator);
}
@Override
public
Void evaluateSequential(PipelineHelper helper,
Spliterator
spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
forEach并行时,主要采用evaluate方法,在pipeline中采用终止操作处理结果。