并行流的行为与Stream不同

Cla*_*iga 15 java parallel-processing java-stream

我无法理解为什么并行流和流为完全相同的语句提供不同的结果.

    List<String> list = Arrays.asList("1", "2", "3");
    String resultParallel = list.parallelStream().collect(StringBuilder::new,
            (response, element) -> response.append(" ").append(element),
            (response1, response2) -> response1.append(",").append(response2.toString()))
            .toString();
    System.out.println("ResultParallel: " + resultParallel);

    String result = list.stream().collect(StringBuilder::new,
            (response, element) -> response.append(" ").append(element),
            (response1, response2) -> response1.append(",").append(response2.toString()))
            .toString();

    System.out.println("Result: " + result);
Run Code Online (Sandbox Code Playgroud)

ResultParallel:1,2,3

结果:1 2 3

有人可以解释为什么会发生这种情况以及如何让非并行版本得到与并行版本相同的结果?

Ole*_*hov 12

Java 8 Stream.collect方法具有以下签名:

<R> R collect(Supplier<R> supplier,
              BiConsumer<R, ? super T> accumulator,
              BiConsumer<R, R> combiner);
Run Code Online (Sandbox Code Playgroud)

其中BiConsumer<R, R> combiner仅在并行数据流被称为(为了部分结果组合成单个容器),因此第一个代码段的输出为:

ResultParallel: 1, 2, 3
Run Code Online (Sandbox Code Playgroud)

sequential版本combiner中没有被调用(请参阅此答案),因此忽略以下语句:

(response1, response2) -> response1.append(",").append(response2.toString())
Run Code Online (Sandbox Code Playgroud)

结果不同:

1 2 3
Run Code Online (Sandbox Code Playgroud)

怎么解决?检查@Eugene的答案或这个问题和答案.

  • 这是来自Stuart Marks的[答案](/sf/ask/2044712351/#answer-29295055). (3认同)

Ste*_*n C 8

要理解为什么会出错,请从javadoc中考虑这一点.

accumulator - 一个关联的,非干扰的无状态函数,必须将元素折叠到结果容器中.

combiner- 一个关联的,非干扰的无状态函数,它接受两个部分结果容器并合并它们,它们必须与累加器函数兼容.组合器函数必须将元素从第二个结果容器折叠到第一个结果容器中.

这就是说,元素是否通过"累积"或"组合"或两者的某种组合来收集并不重要.但是在您的代码中,累加器和组合器使用不同的分隔符进行连接.它们在javadoc所要求的意义上不是"兼容的".

这会导致结果不一致,具体取决于是使用顺序流还是并行流.

  • 在并行的情况下,流被分成子流1以由不同的线程处理.这导致每个子流的单独收集.然后合并这些集合.

  • 在顺序情况下,流不会被拆分.相反,流简单地累积到单个集合中,并且不需要进行组合.


观察:

  • 通常,对于执行简单转换的此大小的流,parallelStream()可能会使事情变慢.

  • 在这种特定情况下,parallelStream()版本的瓶颈将是组合步骤.这是一个连续步骤,它执行与整个串行管道相同的复制量.所以,事实上,并行化肯定会让事情变得更慢.

  • 实际上,lambdas行为不正确.它们在开始时添加了额外的空间,如果combiner使用的话,它会加倍一些空格.更正确的版本是:

    String result = list.stream().collect(
        StringBuilder::new,
        (b, e) -> b.append(b.isEmpty() ? "" : " ").append(e),
        (l, r) -> l.append(l.isEmpty() ? "" : " ").append(r)).toString();
    
    Run Code Online (Sandbox Code Playgroud)
  • Joiner班是一个更简单,更有效的来连接流路.(图片来源:@Eugene)


1 - 在这种情况下,每个子流只有一个元素.对于更长的列表,通常会获得与工作线程一样多的子流,并且子流将包含多个元素.


Eug*_*ene 7

作为旁注,即使你,用空格替换combiner,你的结果仍然会有所不同(稍微修改了代码以使其更具可读性):

String resultParallel = list.parallelStream().collect(
            StringBuilder::new,
            (builder, elem) -> builder.append(" ").append(elem),
            (left, right) -> left.append(" ").append(right)).toString();

    String result = list.stream().collect(
            StringBuilder::new,
            (builder, elem) -> builder.append(" ").append(elem),
            (left, right) -> left.append(" ").append(right)).toString();


  System.out.println("ResultParallel: ->" + resultParallel + "<-"); // -> 1  2  3  4<-
  System.out.println("Result: ->" + result + "<-"); // -> 1 2 3 4<-
Run Code Online (Sandbox Code Playgroud)

注意你有多少空格.

java-doc有提示:

组合器...必须与累加器功能兼容

如果你想加入,有更简单的选项,如:

String.join(",", yourList)
yourList.stream().collect(Collectors.joining(","))
Run Code Online (Sandbox Code Playgroud)