来自HashSet的并行流不会并行运行

Nem*_*emo 20 java parallel-processing lambda java-8 java-stream

我有要并行处理的元素集合.当我使用a时List,并行性有效.但是,当我使用a时Set,它并不是并行运行的.

我写了一个显示问题的代码示例:

public static void main(String[] args) {
    ParallelTest test = new ParallelTest();

    List<Integer> list = Arrays.asList(1,2);
    Set<Integer> set = new HashSet<>(list);

    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    System.out.println("set print");
    try {
        forkJoinPool.submit(() ->
            set.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }

    System.out.println("\n\nlist print");
    try {
        forkJoinPool.submit(() ->
            list.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }   
}

private void print(int i){
    System.out.println("start: " + i);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    }
    System.out.println("end: " + i);
}
Run Code Online (Sandbox Code Playgroud)

这是我在Windows 7上获得的输出

set print
start: 1
end: 1
start: 2
end: 2

list print
start: 2
start: 1
end: 1
end: 2
Run Code Online (Sandbox Code Playgroud)

我们可以看到在Set处理第二个元素之前必须完成的第一个元素.对于List,第二个元素在第一个元素完成之前开始.

你能告诉我导致这个问题的原因,以及如何使用Set集合来避免它?

Stu*_*rks 31

我可以重现您看到的行为,其中并行性与您指定的fork-join池并行性的并行性不匹配.在将fork-join pool parallelism设置为10并将集合中的元素数量增加到50之后,我看到基于列表的流的并行性仅上升到6,而基于集合的流的并行性从未超过2.

但请注意,将任务提交到fork-join池以在该池中运行并行流的这种技术是一种实现"技巧",并不能保证能够正常工作.实际上,未指定用于执行并行流的线程或线程池.默认情况下,使用公共fork-join池,但在不同的环境中,最终可能会使用不同的线程池.(考虑应用程序服务器中的容器.)

java.util.stream.AbstractTask类中,该LEAF_TARGET字段确定完成的拆分量,这反过来又决定了可以实现的并行度.此字段的值基于ForkJoinPool.getCommonPoolParallelism()当然使用公共池的并行性,而不是任何池正在运行任务.

可以说这是一个错误(参见OpenJDK问题JDK-8190974),但是,无论如何,整个区域都未指定.但是,系统的这个区域肯定需要开发,例如在拆分策略,可用并行数量,处理阻塞任务以及其他问题方面.JDK的未来版本可能会解决其中一些问题.

同时,可以通过使用系统属性来控制公共fork-join池的并行性.如果您将此行添加到您的程序,

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
Run Code Online (Sandbox Code Playgroud)

并且您在公共池中运行流(或者如果您将它们提交到具有足够高并行度设置的池中),您将观察到更多任务并行运行.

您还可以使用该-D选项在命令行上设置此属性.

同样,这不是保证行为,并且可能在将来发生变化.但是这种技术可能适用于可预见的未来的JDK 8实现.

  • @DimitarDimitrov我认为这比你如何做到这一点简单."可以说这是一个错误"声明是关于流中的分裂行为.它总是根据公共池的并行性进行拆分.但是如果流是针对另一个池(使用未记录的hack),则拆分仍然受公共池的并行性控制,而不是目标池的并行性. (3认同)