Rob*_*roj 12 java parallel-processing java-8 java-stream
比方说,我有一个List<Integer> ints = new ArrayList<>();
,我想将值添加到它,并使用比较并行执行的结果forEach()
和Collectors.toList()
.
首先,我将一些来自顺序IntStream和forEach的值添加到此列表中:
IntStream.range(0,10).boxed().forEach(ints::add);
Run Code Online (Sandbox Code Playgroud)
我得到了正确的结果:
ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Run Code Online (Sandbox Code Playgroud)
现在我.clear()
在列表中并行执行相同的操作:
IntStream.range(0,10).parallel().boxed().forEach(ints::add);
Run Code Online (Sandbox Code Playgroud)
现在由于多线程,我得到了错误的结果:
ints ==> [6, 5, 8, 9, 7, 2, 4, 3, 1, 0]
Run Code Online (Sandbox Code Playgroud)
现在我切换到收集相同的整数流:
IntStream.range(0,10).parallel().boxed().collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
我得到了正确的结果:
ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Run Code Online (Sandbox Code Playgroud)
问题:
为什么两个并行执行产生不同的Collector
结果?为什么产生正确的结果?
如果forEach
产生随机结果,也Collector
应该.我没有指定任何排序,我认为他在内部添加到我手动使用的列表forEach
.由于他并行执行,因此他的add
方法应该以未指定的顺序获取值.测试完成了JShell.
编辑:这里没有重复.我理解相关的问题.收集器为什么产生正确的结果?如果他将产生另一个随机结果,我不会问.
tep*_*pic 11
该collect
操作将如果产生无序的输出Collector
,你通过它有不同的特点.也就是说,如果设置了CONCURRENT
和UNORDERED
标志(请参阅参考资料Collector.characteristics()
).
引擎盖Collectors.toList()
下构建的Collector
大致相当于:
Collector.of(
// Supplier of accumulators
ArrayList::new,
// Accumulation operation
List::add,
// Combine accumulators
(left, right) -> {
left.addAll(right);
return left;
}
)
Run Code Online (Sandbox Code Playgroud)
一些日志记录显示了collect
操作将保持线程安全性和流顺序的长度:
Collector.of(
() -> {
System.out.printf("%s supplying\n", Thread.currentThread().getName());
return new ArrayList<>();
},
(l, o) -> {
System.out.printf("%s accumulating %s to %s\n", Thread.currentThread().getName(), o, l);
l.add(o);
},
(l1, l2) -> {
System.out.printf("%s combining %s & %s\n", Thread.currentThread().getName(), l1, l2);
l1.addAll(l2);
return l1;
}
)
Run Code Online (Sandbox Code Playgroud)
日志:
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 2 to []
ForkJoinPool-1-worker-1 accumulating 6 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 4 to []
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 5 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 3 to []
ForkJoinPool-1-worker-0 combining [3] & [4]
ForkJoinPool-1-worker-0 combining [2] & [3, 4]
ForkJoinPool-1-worker-1 combining [5] & [6]
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-0 accumulating 1 to []
ForkJoinPool-1-worker-1 accumulating 8 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 9 to []
ForkJoinPool-1-worker-1 combining [8] & [9]
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 7 to []
ForkJoinPool-1-worker-1 combining [7] & [8, 9]
ForkJoinPool-1-worker-1 combining [5, 6] & [7, 8, 9]
ForkJoinPool-1-worker-0 accumulating 0 to []
ForkJoinPool-1-worker-0 combining [0] & [1]
ForkJoinPool-1-worker-0 combining [0, 1] & [2, 3, 4]
ForkJoinPool-1-worker-0 combining [0, 1, 2, 3, 4] & [5, 6, 7, 8, 9]
Run Code Online (Sandbox Code Playgroud)
您可以看到从流中读取的每个内容都写入新的累加器,并且它们经过仔细组合以维护顺序.
如果我们设置CONCURRENT
和UNORDERED
特征标志,收集方法可以自由采取捷径; 只分配一个累加器,不需要有序组合.
使用:
Collector.of(
() -> {
System.out.printf("%s supplying\n", Thread.currentThread().getName());
return Collections.synchronizedList(new ArrayList<>());
},
(l, o) -> {
System.out.printf("%s accumulating %s to %s\n", Thread.currentThread().getName(), o, l);
l.add(o);
},
(l1, l2) -> {
System.out.printf("%s combining %s & %s\n", Thread.currentThread().getName(), l1, l2);
l1.addAll(l2);
return l1;
},
Characteristics.CONCURRENT,
Characteristics.UNORDERED
)
Run Code Online (Sandbox Code Playgroud)
日志:
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 6 to []
ForkJoinPool-1-worker-0 accumulating 2 to [6]
ForkJoinPool-1-worker-1 accumulating 5 to [6, 2]
ForkJoinPool-1-worker-0 accumulating 4 to [6, 2, 5]
ForkJoinPool-1-worker-0 accumulating 3 to [6, 2, 5, 4]
ForkJoinPool-1-worker-0 accumulating 1 to [6, 2, 5, 4, 3]
ForkJoinPool-1-worker-0 accumulating 0 to [6, 2, 5, 4, 3, 1]
ForkJoinPool-1-worker-1 accumulating 8 to [6, 2, 5, 4, 3, 1, 0]
ForkJoinPool-1-worker-0 accumulating 7 to [6, 2, 5, 4, 3, 1, 0, 8]
ForkJoinPool-1-worker-1 accumulating 9 to [6, 2, 5, 4, 3, 1, 0, 8, 7]
Run Code Online (Sandbox Code Playgroud)