Java 8 Stream - 并行执行 - 不同的结果 - 为什么?

Rob*_*roj 12 java parallel-processing java-8 java-stream

比方说,我有一个List<Integer> ints = new ArrayList<>();,我想将值添加到它,并使用比较并行执行的结果forEach()Collectors.toList().

首先,我将一些来自顺序IntStream和forEach的值添加到此列表中:

 IntStream.range(0,10).boxed().forEach(ints::add);
Run Code Online (Sandbox Code Playgroud)

我得到了正确的结果:

ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Run Code Online (Sandbox Code Playgroud)

现在我.clear()在列表中并行执行相同的操作:

IntStream.range(0,10).parallel().boxed().forEach(ints::add);
Run Code Online (Sandbox Code Playgroud)

现在由于多线程,我得到了错误的结果:

ints ==> [6, 5, 8, 9, 7, 2, 4, 3, 1, 0]
Run Code Online (Sandbox Code Playgroud)

现在我切换到收集相同的整数流:

IntStream.range(0,10).parallel().boxed().collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

我得到了正确的结果:

ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Run Code Online (Sandbox Code Playgroud)

问题: 为什么两个并行执行产生不同的Collector结果?为什么产生正确的结果?

如果forEach产生随机结果,也Collector应该.我没有指定任何排序,我认为他在内部添加到我手动使用的列表forEach.由于他并行执行,因此他的add方法应该以未指定的顺序获取值.测试完成了JShell.

编辑:这里没有重复.我理解相关的问题.收集器为什么产生正确的结果?如果他将产生另一个随机结果,我不会问.

tep*_*pic 11

collect操作如果产生无序的输出Collector,你通过它有不同的特点.也就是说,如果设置了CONCURRENTUNORDERED标志(请参阅参考资料Collector.characteristics()).

引擎盖Collectors.toList()下构建的Collector大致相当于:

Collector.of(
    // Supplier of accumulators
    ArrayList::new,
    // Accumulation operation
    List::add,
    // Combine accumulators
    (left, right) -> {
        left.addAll(right);
        return left;
    }
)
Run Code Online (Sandbox Code Playgroud)

一些日志记录显示了collect操作将保持线程安全性和流顺序的长度:

Collector.of(
    () -> {
        System.out.printf("%s supplying\n", Thread.currentThread().getName());
        return new ArrayList<>();
    },
    (l, o) -> {
        System.out.printf("%s accumulating %s to %s\n", Thread.currentThread().getName(), o, l);
        l.add(o);
    },
    (l1, l2) -> {
        System.out.printf("%s combining %s & %s\n", Thread.currentThread().getName(), l1, l2);
        l1.addAll(l2);
        return l1;
    }
)
Run Code Online (Sandbox Code Playgroud)

日志:

ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 2 to []
ForkJoinPool-1-worker-1 accumulating 6 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 4 to []
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 5 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 3 to []
ForkJoinPool-1-worker-0 combining [3] & [4]
ForkJoinPool-1-worker-0 combining [2] & [3, 4]
ForkJoinPool-1-worker-1 combining [5] & [6]
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-0 accumulating 1 to []
ForkJoinPool-1-worker-1 accumulating 8 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 9 to []
ForkJoinPool-1-worker-1 combining [8] & [9]
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 7 to []
ForkJoinPool-1-worker-1 combining [7] & [8, 9]
ForkJoinPool-1-worker-1 combining [5, 6] & [7, 8, 9]
ForkJoinPool-1-worker-0 accumulating 0 to []
ForkJoinPool-1-worker-0 combining [0] & [1]
ForkJoinPool-1-worker-0 combining [0, 1] & [2, 3, 4]
ForkJoinPool-1-worker-0 combining [0, 1, 2, 3, 4] & [5, 6, 7, 8, 9]
Run Code Online (Sandbox Code Playgroud)

您可以看到从流中读取的每个内容都写入新的累加器,并且它们经过仔细组合以维护顺序.

如果我们设置CONCURRENTUNORDERED特征标志,收集方法可以自由采取捷径; 只分配一个累加器,不需要有序组合.

使用:

Collector.of(
    () -> {
        System.out.printf("%s supplying\n", Thread.currentThread().getName());
        return Collections.synchronizedList(new ArrayList<>());
    },
    (l, o) -> {
        System.out.printf("%s accumulating %s to %s\n", Thread.currentThread().getName(), o, l);
        l.add(o);
    },
    (l1, l2) -> {
        System.out.printf("%s combining %s & %s\n", Thread.currentThread().getName(), l1, l2);
        l1.addAll(l2);
        return l1;
    },
    Characteristics.CONCURRENT,
    Characteristics.UNORDERED
)
Run Code Online (Sandbox Code Playgroud)

日志:

ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 6 to []
ForkJoinPool-1-worker-0 accumulating 2 to [6]
ForkJoinPool-1-worker-1 accumulating 5 to [6, 2]
ForkJoinPool-1-worker-0 accumulating 4 to [6, 2, 5]
ForkJoinPool-1-worker-0 accumulating 3 to [6, 2, 5, 4]
ForkJoinPool-1-worker-0 accumulating 1 to [6, 2, 5, 4, 3]
ForkJoinPool-1-worker-0 accumulating 0 to [6, 2, 5, 4, 3, 1]
ForkJoinPool-1-worker-1 accumulating 8 to [6, 2, 5, 4, 3, 1, 0]
ForkJoinPool-1-worker-0 accumulating 7 to [6, 2, 5, 4, 3, 1, 0, 8]
ForkJoinPool-1-worker-1 accumulating 9 to [6, 2, 5, 4, 3, 1, 0, 8, 7]
Run Code Online (Sandbox Code Playgroud)