krm*_*007 12 java parallel-processing java-8 java-stream
为什么下面的代码不打印任何输出,而如果我们删除并行,它打印0,1?
IntStream.iterate(0, i -> ( i + 1 ) % 2)
.parallel()
.distinct()
.limit(10)
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
虽然我知道理想的限制应该放在不同之前,但我的问题更多地与添加并行处理引起的差异有关.
真正的原因是有序并行 .distinct()
是完整的屏障操作,如文档中所述:
保持
distinct()
并行管道的稳定性相对昂贵(要求操作充当完全屏障,具有大量缓冲开销),并且通常不需要稳定性.
"全屏障操作"意味着必须在下游开始之前执行所有上游操作.Stream API中只有两个完整的屏障操作:( .sorted()
每次)和.distinct()
(按顺序并行的情况).因为你有非短路无限流提供给.distinct()
你最终无限循环.通过契约.distinct()
不能以任何顺序向下游发射元素:它应该总是发出第一个重复元素.虽然理论上可以.distinct()
更好地实现并行排序,但实现起来要复杂得多.
至于解决方案,@ user140547是正确的:.unordered()
在.distinct()
此切换distinct()
算法之前添加无序序列(仅使用共享ConcurrentHashMap
来存储所有观察到的元素并将每个新元素发送到下游).请注意,添加.unordered()
后将 .distinct()
无济于事.
Stream.iterate返回“无限顺序有序流”。因此,使顺序流并行并没有太大用处。
根据 Stream包的描述:
对于并行流,放宽排序约束有时可以实现更高效的执行。如果元素的排序不相关,则可以更有效地实现某些聚合操作,例如过滤重复项 (distinct()) 或分组缩减 (Collectors.groupingBy())。同样,本质上与遇到顺序相关的操作(例如 limit())可能需要缓冲来确保正确的顺序,从而削弱了并行性的优势。如果流具有遇到顺序,但用户并不特别关心该遇到顺序,则使用 unordered() 显式对流进行解序可能会提高某些有状态或终端操作的并行性能。然而,大多数流管道,例如上面的“块权重之和”示例,即使在排序约束下仍然可以有效地并行化。
您的情况似乎就是这种情况,使用 unordered(),它会打印 0,1。
IntStream.iterate(0, i -> (i + 1) % 2)
.parallel()
.unordered()
.distinct()
.limit(10)
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)