Mau*_*ice 8 java parallel-processing reduce java-8 java-stream
当reduce()在并行流上使用该操作时,OCP考试书指出reduce()参数必须遵守某些原则.这些论点如下:
(a op b) op c等于a op (b op c).u和t combiner.apply(u, accumulator.apply(identity, t))等于accumulator.apply(u,t).考试书提供了两个例子来说明这些原则,请参阅下面的代码:
关联的示例:
System.out.println(Arrays,asList(1,2,3,4,5,6))
.parallelStream()
.reduce(0,(a,b) -> (a-b))); //NOT AN ASSOCIATIVE ACCUMULATOR
Run Code Online (Sandbox Code Playgroud)
OCP书中对此有何说法:
它可能会输出-21,3或其他一些值,因为累加器函数违反了associativity属性.
身份要求的示例:
System.out.println(Arrays.asList("w","o","l","f"))
.parallelStream()
.reduce("X", String::concat));
Run Code Online (Sandbox Code Playgroud)
OCP书中对此有何说法:
如果我们使用的身份参数不是真正的身份值,您可以看到其他问题.它可以输出XwXoXlXf.作为并行过程的一部分,标识将应用于流中的多个元素,从而导致非常意外的数据.
我不明白这些例子.使用累加器示例,累加器以0 -1 = -1然后-1 -2开始,其中= -3然后-6等等一直到-21.我明白,因为生成的arraylist不同步,结果可能是不可预测的,因为竞争条件等的可能性,但为什么累加器不关联?Woulden也不会(a+b)导致不可预测的结果?我真的没有看到示例中使用的累加器有什么问题,以及为什么它不是关联的,但是我仍然不能完全理解关联原则是什么.
我也不了解身份的例子.据我所知,如果4个独立的线程同时开始与身份一起累积,那么结果确实可能是XwXoXlXf,但这与身份参数本身有什么关系呢?究竟什么是适当的身份才能使用呢?
我想知道是否有人可以更多地了解这些原则.
谢谢
为什么累加器不具有关联性?
它不是关联的,因为减法运算的顺序决定了最终结果。
如果运行serial Stream,将得到以下预期结果:
0 - 1 - 2 - 3 - 4 - 5 - 6 = -21
Run Code Online (Sandbox Code Playgroud)
另一方面,对于parallel Stream,工作被拆分为多个线程。例如,如果reduce在6个线程上并行执行,然后合并中间结果,则可以获得不同的结果:
0 - 1 0 - 2 0 - 3 0 - 4 0 - 5 0 - 6
-1 -2 -3 -4 -5 -6
-1 - (-2) -3 - (-4) -5 - (-6)
1 1 1
1 - 1
0 - 1
-1
Run Code Online (Sandbox Code Playgroud)
或者,举一个简短的例子:
(1 - 2) - 3 = -4
1 - (2 - 3) = 2
Run Code Online (Sandbox Code Playgroud)
因此,减法不具有关联性。
另一方面,a+b不会引起相同的问题,因为加法是关联运算符(即(a+b)+c == a+(b+c))。
身份示例的问题在于,当在多个线程上并行执行reduce时,会将“ X”附加到每个中间结果的开头。
那么要使用的正确身份到底是什么?
如果将标识值更改为"":
System.out.println(Arrays.asList("w","o","l","f"))
.parallelStream()
.reduce("", String::concat));
Run Code Online (Sandbox Code Playgroud)
您会得到“狼”而不是“ XwXoXlXf”。
让我举两个例子。首先是身份被破坏的地方:
int result = Stream.of(1, 2, 3, 4, 5, 6)
.parallel()
.reduce(10, (a, b) -> a + b);
System.out.println(result); // 81 on my run
Run Code Online (Sandbox Code Playgroud)
基本上,您已经违反了以下规则:The identity value must be an identity for the accumulator function. This means that for all u, accumulator(identity, u) is equal to u。
或更简单地说,让我们看看该规则是否适用于Stream中的一些随机数据:
Integer identity = 10;
BinaryOperator<Integer> combiner = (x, y) -> x + y;
boolean identityRespected = combiner.apply(identity, 1) == 1;
System.out.println(identityRespected); // prints false
Run Code Online (Sandbox Code Playgroud)
第二个例子:
/**
* count letters, adding a bit more all the time
*/
private static int howMany(List<String> tokens) {
return tokens.stream()
.parallel()
.reduce(0, // identity
(i, s) -> { // accumulator
return s.length() + i;
}, (left, right) -> { // combiner
return left + right + left; // notice the extra left here
});
}
Run Code Online (Sandbox Code Playgroud)
然后使用以下命令调用它:
List<String> left = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee");
List<String> right = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee", "");
System.out.println(howMany(left)); // 38 on my run
System.out.println(howMany(right)); // 50 on my run
Run Code Online (Sandbox Code Playgroud)
基本上,您已经违反了此规则:Additionally, the combiner function must be compatible with the accumulator function或在代码中:
// this must hold!
// combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
Integer identity = 0;
String t = "aa";
Integer u = 3; // "bbb"
BiFunction<Integer, String, Integer> accumulator = (Integer i, String s) -> i + s.length();
BinaryOperator<Integer> combiner = (left, right) -> left + right + left;
int first = accumulator.apply(identity, t); // 2
int second = combiner.apply(u, first); // 3 + 2 + 3 = 8
Integer shouldBe8 = accumulator.apply(u, t);
System.out.println(shouldBe8 == second); // false
Run Code Online (Sandbox Code Playgroud)
虽然这个问题已经得到了回答和接受,但我认为可以用更简单、更实用的方式来回答。
如果您没有有效identity的关联累加器/组合器,则reduce操作的结果将取决于:
Stream内容Stream让我们尝试一个非关联累加器/组合器的例子(基本上,我们通过改变线程数来减少一个序列中的 50 个数字的列表并并行):
System.out.println("sequential: reduce="+
IntStream.rangeClosed(1, 50).boxed()
.reduce(
0,
(a,b)->a-b,
(a,b)->a-b));
for (int n=1; n<6; n++) {
ForkJoinPool pool = new ForkJoinPool(n);
final int finalN = n;
try {
pool.submit(()->{
System.out.println(finalN+" threads : reduce="+
IntStream.rangeClosed(1, 50).boxed()
.parallel()
.reduce(
0,
(a,b)->a-b,
(a,b)->a-b));
}).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
}
Run Code Online (Sandbox Code Playgroud)
这将显示以下结果(Oracle JDK 10.0.1):
sequential: reduce=-1275
1 threads : reduce=325
2 threads : reduce=-175
3 threads : reduce=-25
4 threads : reduce=75
5 threads : reduce=-25
Run Code Online (Sandbox Code Playgroud)
这表明结果取决于参与reduce计算的线程数。
笔记:
Stream内容和相同的线程数在多次运行时总是会导致相同的减少值。我想这是因为并行流使用确定性的Spliterator.ForkJoinPool为 1,2,3,4 的a 产生相同的减少值 3或 5 个线程。对于identity,正如 Eran 在“XwXoXlXf”示例中所写的那样,有 4 个线程,每个线程将使用identity作为一种String前缀开始。但请注意:虽然 OCP 书建议""和0是有效的identity,但它取决于累加器/组合器功能。例如:
0identity对累加器有效(a,b)->a+b(因为a+0=a)1identity对累加器有效(a,b)->a*b(因为a*1=a,但0无效,因为a*0=0!)| 归档时间: |
|
| 查看次数: |
747 次 |
| 最近记录: |