Java中的无限流并行处理

krm*_*007 12 java parallel-processing java-8 java-stream

为什么下面的代码不打印任何输出,而如果我们删除并行,它打印0,1?

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .parallel()
         .distinct()
         .limit(10)
         .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

虽然我知道理想的限制应该放在不同之前,但我的问题更多地与添加并行处理引起的差异有关.

Tag*_*eev 7

真正的原因是有序并行 .distinct()是完整的屏障操作,文档中所述:

保持distinct()并行管道的稳定性相对昂贵(要求操作充当完全屏障,具有大量缓冲开销),并且通常不需要稳定性.

"全屏障操作"意味着必须在下游开始之前执行所有上游操作.Stream API中只有两个完整的屏障操作:( .sorted()每次)和.distinct()(按顺序并行的情况).因为你有非短路无限流提供给.distinct()你最终无限循环.通过契约.distinct()不能以任何顺序向下游发射元素:它应该总是发出第一个重复元素.虽然理论上可以.distinct()更好地实现并行排序,但实现起来要复杂得多.

至于解决方案,@ user140547是正确的:.unordered().distinct()此切换distinct()算法之前添加无序序列(仅使用共享ConcurrentHashMap来存储所有观察到的元素并将每个新元素发送到下游).请注意,添加.unordered() 后将 .distinct()无济于事.


use*_*547 5

Stream.iterate返回“无限顺序有序流”。因此,使顺序流并行并没有太大用处。

根据 Stream包的描述:

对于并行流,放宽排序约束有时可以实现更高效的执行。如果元素的排序不相关,则可以更有效地实现某些聚合操作,例如过滤重复项 (distinct()) 或分组缩减 (Collectors.groupingBy())。同样,本质上与遇到顺序相关的操作(例如 limit())可能需要缓冲来确保正确的顺序,从而削弱了并行性的优势。如果流具有遇到顺序,但用户并不特别关心该遇到顺序,则使用 unordered() 显式对流进行解序可能会提高某些有状态或终端操作的并行性能。然而,大多数流管道,例如上面的“块权重之和”示例,即使在排序约束下仍然可以有效地并行化。

您的情况似乎就是这种情况,使用 unordered(),它会打印 0,1。

    IntStream.iterate(0, i -> (i + 1) % 2)
            .parallel()
            .unordered()
            .distinct()
            .limit(10)
            .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)