并行flatMap总是顺序的

Eug*_*ene 9 java java-8 java-stream java-9

假设我有这个代码:

 Collections.singletonList(10)
            .parallelStream() // .stream() - nothing changes
            .flatMap(x -> Stream.iterate(0, i -> i + 1)
                    .limit(x)
                    .parallel()
                    .peek(m -> {
                        System.out.println(Thread.currentThread().getName());
                    }))
            .collect(Collectors.toSet());
Run Code Online (Sandbox Code Playgroud)

输出是相同的线程名称,因此这里没有任何好处parallel- 我的意思是,有一个线程可以完成所有工作.

里面flatMap有这个代码:

result.sequential().forEach(downstream);
Run Code Online (Sandbox Code Playgroud)

我理解强制sequential属性如果"外部"流将是并行的(它们可能会阻塞),"外部"将不得不等待"flatMap"完成,反过来(因为使用相同的公共池)但为什么总是强迫吗?

这是那些在以后的版本中可能发生变化的事情之一吗?

Hol*_*ger 10

有两个不同的方面.

首先,只有一个管道是顺序的或并行的.在内部流中选择顺序或并行是无关紧要的.请注意,downstream您在引用的代码段中看到的使用者代表整个后续流管道,因此在您的代码中.collect(Collectors.toSet());,此消费者最终会将结果元素添加到一个Set非线程安全的实例中.因此,与该单个消费者并行处理内部流将破坏整个操作.

如果外部流被拆分,那么引用的代码可能会与不同的消费者同时调用,从而添加到不同的集合中.这些调用中的每一个都将处理外部流映射到不同内部流实例的不同元素.由于外部流仅由单个元素组成,因此无法拆分.

这个方法已经实现,也是为什么flatMap()之后的filter()在Java流中"不完全"懒惰的原因?问题,forEach在内部流上调用,它将所有元素传递给下游消费者.正如这个答案所证明的那样,支持懒惰和子流分裂的替代实现是可能的.但这是实现它的一种根本不同的方式.Stream实现的当前设计主要通过消费者组合来实现,因此最终,源分裂器(以及从它拆分的那些)接收Consumer代表整个流管道中的任一个tryAdvanceforEachRemaining.相比之下,链接答案的解决方案会进行分裂器组合,从而产生Spliterator对源分裂器的新委托.我认为,这两种方法都有优势,我不确定,在反过来工作时,OpenJDK实施会失去多少.

  • @ holi-java:缺少的懒惰可以看作是一个bug,并且已经有一个错误报告.然而,有限的并行性只是潜在性能改进的一个领域.在实践中,这仅影响具有外部流中的少量元素和更大内部流的流. (5认同)
  • @DmytroBuryak 我仍然希望有一个内置的解决方案...... (2认同)