Stream reduce()要求究竟需要什么?

Mau*_*ice 8 java parallel-processing reduce java-8 java-stream

reduce()在并行流上使用该操作时,OCP考试书指出reduce()参数必须遵守某些原则.这些论点如下:

  1. 必须定义标识,使得对于流u中的所有元素,combiner.apply(identity,u)等于u.
  2. 累加器运算符op必须是关联的和无状态的,(a op b) op c等于a op (b op c).
  3. 组合器运算符还必须是关联的和无状态的并且与标识兼容,以便对于所有ut combiner.apply(u, accumulator.apply(identity, t))等于accumulator.apply(u,t).

考试书提供了两个例子来说明这些原则,请参阅下面的代码:

关联的示例:

System.out.println(Arrays,asList(1,2,3,4,5,6))
.parallelStream()
.reduce(0,(a,b) -> (a-b))); //NOT AN ASSOCIATIVE ACCUMULATOR
Run Code Online (Sandbox Code Playgroud)

OCP书中对此有何说法:

它可能会输出-21,3或其他一些值,因为累加器函数违反了associativity属性.

身份要求的示例:

System.out.println(Arrays.asList("w","o","l","f"))
.parallelStream()
.reduce("X", String::concat));
Run Code Online (Sandbox Code Playgroud)

OCP书中对此有何说法:

如果我们使用的身份参数不是真正的身份值,您可以看到其他问题.它可以输出XwXoXlXf.作为并行过程的一部分,标识将应用于流中的多个元素,从而导致非常意外的数据.

我不明白这些例子.使用累加器示例,累加器以0 -1 = -1然后-1 -2开始,其中= -3然后-6等等一直到-21.我明白,因为生成的arraylist不同步,结果可能是不可预测的,因为竞争条件等的可能性,但为什么累加器不关联?Woulden也不会(a+b)导致不可预测的结果?我真的没有看到示例中使用的累加器有什么问题,以及为什么它不是关联的,但是我仍然不能完全理解关联原则是什么.

我也不了解身份的例子.据我所知,如果4个独立的线程同时开始与身份一起累积,那么结果确实可能是XwXoXlXf,但这与身份参数本身有什么关系呢?究竟什么是适当的身份才能使用呢?

我想知道是否有人可以更多地了解这些原则.

谢谢

Era*_*ran 5

为什么累加器不具有关联性?

它不是关联的,因为减法运算的顺序决定了最终结果。

如果运行serial Stream,将得到以下预期结果:

0 - 1 - 2 - 3 - 4 - 5 - 6 = -21
Run Code Online (Sandbox Code Playgroud)

另一方面,对于parallel Stream,工作被拆分为多个线程。例如,如果reduce在6个线程上并行执行,然后合并中间结果,则可以获得不同的结果:

0 - 1   0 - 2   0 - 3      0 - 4     0 - 5    0 - 6
  -1     -2      -3         -4        -5        -6

  -1 - (-2)         -3 - (-4)          -5 - (-6)
      1                 1                  1
           1   -   1
               0            -     1

                        -1
Run Code Online (Sandbox Code Playgroud)

或者,举一个简短的例子:

(1 - 2) - 3 = -4
1 - (2 - 3) =  2
Run Code Online (Sandbox Code Playgroud)

因此,减法不具有关联性。

另一方面,a+b不会引起相同的问题,因为加法是关联运算符(即(a+b)+c == a+(b+c))。

身份示例的问题在于,当在多个线程上并行执行reduce时,会将“ X”附加到每个中间结果的开头。

那么要使用的正确身份到底是什么?

如果将标识值更改为""

System.out.println(Arrays.asList("w","o","l","f"))
.parallelStream()
.reduce("", String::concat));
Run Code Online (Sandbox Code Playgroud)

您会得到“狼”而不是“ XwXoXlXf”。

  • @Hasnain我不知道。这是一个实现细节。 (2认同)

Eug*_*ene 5

让我举两个例子。首先是身份被破坏的地方:

int result = Stream.of(1, 2, 3, 4, 5, 6)
            .parallel()
            .reduce(10, (a, b) -> a + b);

    System.out.println(result); // 81 on my run
Run Code Online (Sandbox Code Playgroud)

基本上,您已经违反了以下规则:The identity value must be an identity for the accumulator function.  This means that for all u, accumulator(identity, u) is equal to u

或更简单地说,让我们看看该规则是否适用于Stream中的一些随机数据:

 Integer identity = 10;
 BinaryOperator<Integer> combiner = (x, y) -> x + y;
 boolean identityRespected = combiner.apply(identity, 1) == 1;
 System.out.println(identityRespected); // prints false
Run Code Online (Sandbox Code Playgroud)

第二个例子:

/**
 * count letters, adding a bit more all the time
 */
private static int howMany(List<String> tokens) {
    return tokens.stream()
            .parallel()
            .reduce(0, // identity
                    (i, s) -> { // accumulator
                        return s.length() + i;
                    }, (left, right) -> { // combiner
                        return left + right + left; // notice the extra left here
                    });
}
Run Code Online (Sandbox Code Playgroud)

然后使用以下命令调用它:

    List<String> left = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee");
    List<String> right = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee", "");

    System.out.println(howMany(left));  // 38 on my run
    System.out.println(howMany(right)); // 50 on my run
Run Code Online (Sandbox Code Playgroud)

基本上,您已经违反了此规则:Additionally, the combiner function must be compatible with the accumulator function或在代码中:

 // this must hold!
 // combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

    Integer identity = 0;
    String t = "aa";
    Integer u = 3; // "bbb"
    BiFunction<Integer, String, Integer> accumulator = (Integer i, String s) -> i + s.length();
    BinaryOperator<Integer> combiner = (left, right) -> left + right + left;

    int first = accumulator.apply(identity, t); // 2
    int second = combiner.apply(u, first); // 3 + 2 + 3 = 8

    Integer shouldBe8 = accumulator.apply(u, t);

    System.out.println(shouldBe8 == second); // false
Run Code Online (Sandbox Code Playgroud)

  • @JasonLaw 引用是正确的。它与第一个示例中使用的“reduce(T Identity, BinaryOperator&lt;T&gt; Accumulator)”文档中所写的完全相同。这个方法甚至没有 *combiner* 功能。不要与方法“reduce(Uidentity, BiFunction&lt;U,? super T,U&gt; accumulator, BinaryOperator&lt;U&gt;combiner)”混淆。 (4认同)

Jul*_*egg 5

虽然这个问题已经得到了回答和接受,但我认为可以用更简单、更实用的方式来回答。

如果您没有有效identity的关联累加器/组合器,则reduce操作的结果将取决于:

  1. Stream内容
  2. 处理线程的数量 Stream

关联性

让我们尝试一个非关联累加器/组合器的例子(基本上,我们通过改变线程数来减少一个序列中的 50 个数字的列表并并行):

System.out.println("sequential: reduce="+
    IntStream.rangeClosed(1, 50).boxed()
        .reduce(
            0, 
            (a,b)->a-b, 
            (a,b)->a-b));
for (int n=1; n<6; n++) {
    ForkJoinPool pool = new ForkJoinPool(n);
    final int finalN = n;
    try {
        pool.submit(()->{
            System.out.println(finalN+" threads : reduce="+
                IntStream.rangeClosed(1, 50).boxed()
                    .parallel()
                    .reduce(
                        0, 
                        (a,b)->a-b, 
                        (a,b)->a-b));
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }
Run Code Online (Sandbox Code Playgroud)

这将显示以下结果(Oracle JDK 10.0.1):

sequential: reduce=-1275
1 threads : reduce=325
2 threads : reduce=-175
3 threads : reduce=-25
4 threads : reduce=75
5 threads : reduce=-25
Run Code Online (Sandbox Code Playgroud)

这表明结果取决于参与reduce计算的线程数。

笔记:

  • 有趣的是,一个线程的顺序减少和并行减少不会导致相同的结果。我找不到很好的解释。
  • 从我的实验来看,相同的Stream内容和相同的线程数在多次运行时总是会导致相同的减少值。我想这是因为并行流使用确定性的Spliterator.
  • 我不会使用 Boyarsky&Selikoff OCP8 书籍示例,因为流太小 (1,2,3,4,5,6) 并且(在我的机器上)ForkJoinPool为 1,2,3,4 的a 产生相同的减少值 3或 5 个线程。
  • 并行流的默认线程数是可用 CPU 内核数。这就是为什么您可能不会在每台机器上都得到相同的 reduce 结果。

身份

对于identity,正如 Eran 在“XwXoXlXf”示例中所写的那样,有 4 个线程,每个线程将使用identity作为一种String前缀开始。但请注意:虽然 OCP 书建议""0是有效的identity,但它取决于累加器/组合器功能。例如:

  • 0identity对累加器有效(a,b)->a+b(因为a+0=a
  • 1identity对累加器有效(a,b)->a*b(因为a*1=a,但0无效,因为a*0=0!)