Muh*_*jer 2 java parallel-processing multithreading java-stream
我正在对非常大的每个元素应用操作LinkedList<LinkedList<Double>>:
list.stream().map(l -> l.stream().filter(d ->
(Collections.max(l) - d) < 5)
.collect(Collectors.toCollection(LinkedList::new))).collect(Collectors.toCollection(LinkedList::new));
Run Code Online (Sandbox Code Playgroud)
在我的计算机(四核)上,并行流似乎比使用顺序流更快:
list.parallelStream().map(l -> l.parallelStream().filter(d ->
(Collections.max(l) - d) < 5)
.collect(Collectors.toCollection(LinkedList::new))).collect(Collectors.toCollection(LinkedList::new));
Run Code Online (Sandbox Code Playgroud)
然而,并不是每台计算机都是多核的。我的问题是,在单处理器计算机上使用并行流会比使用顺序流明显慢吗?
这是高度特定于实现的,但通常,对于大多数操作,并行流将通过不同的代码路径,这意味着执行额外的工作,但同时,线程池将配置为 CPU 内核的数量。
例如,如果您运行以下程序
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "1");
System.out.println("Parallelism: "+ForkJoinPool.getCommonPoolParallelism());
Set<Thread> threads = ConcurrentHashMap.newKeySet();
for(int run=0; run<2; run++) {
IntStream stream = IntStream.range(0, 100);
if(run==1) {
stream = stream.parallel();
System.out.println("Parallel:");
}
int chunks = stream
.mapToObj(i->Thread.currentThread())
.collect(()->new int[]{1}, (a,t)->threads.add(t), (a,b)->a[0]+=b[0])[0];
System.out.println("processed "+chunks+" chunk(s) with "+threads.size()+" thread(s)");
}
Run Code Online (Sandbox Code Playgroud)
它会打印类似的东西
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "1");
System.out.println("Parallelism: "+ForkJoinPool.getCommonPoolParallelism());
Set<Thread> threads = ConcurrentHashMap.newKeySet();
for(int run=0; run<2; run++) {
IntStream stream = IntStream.range(0, 100);
if(run==1) {
stream = stream.parallel();
System.out.println("Parallel:");
}
int chunks = stream
.mapToObj(i->Thread.currentThread())
.collect(()->new int[]{1}, (a,t)->threads.add(t), (a,b)->a[0]+=b[0])[0];
System.out.println("processed "+chunks+" chunk(s) with "+threads.size()+" thread(s)");
}
Run Code Online (Sandbox Code Playgroud)
可以看到拆分工作负载的效果,而拆分为配置并行度的四倍并非巧合,而且只涉及一个线程,因此这里没有发生线程间通信。在这种情况下,JVM 的优化器是否会检测此操作的单线程性质并消除同步成本,与其他任何事情一样,是一个实现细节。
总而言之,开销不是很大,也不会随着实际工作量而扩展,所以如果实际工作量足够大,可以从 SMP 机器上的并行处理中受益,那么单核上的开销部分将可以忽略不计机器。
但是如果您关心性能,您还应该查看代码的其他方面。
通过Collections.max(l)对 的每个元素重复类似的操作l,您将两个线性操作组合成一个具有二次时间复杂度的操作。只需执行一次此操作很容易:
List<List<Double>> result =
list.parallelStream()
.map(l -> {
double limit = Collections.max(l)-5;
return l.parallelStream()
.filter(d -> limit < d)
.collect(Collectors.toCollection(LinkedList::new));
})
.collect(Collectors.toCollection(LinkedList::new));
Run Code Online (Sandbox Code Playgroud)
根据列表的大小,这种将二次运算变为线性的微小变化的影响可能远远大于将处理时间除以 CPU 内核数(在最佳情况下)。
另一个考虑因素是您是否真的需要一个LinkedList. 对于大多数实际用途, a 的LinkedList性能比 an 差,ArrayList如果您不需要可变性,您可以只使用toList()收集器并让 JRE 返回它可以提供的最佳列表......
List<List<Double>> result =
list.parallelStream()
.map(l -> {
double limit = Collections.max(l)-5;
return l.parallelStream()
.filter(d -> limit < d)
.collect(Collectors.toList());
})
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
请记住,在更改性能特征后,建议重新检查并行化是否仍然有任何好处。还应该单独检查两个流操作。通常,如果外部流具有良好的并行化,将内部流变为并行并不会提高整体性能。
此外,如果源列表是随机访问列表而不是LinkedLists ,则并行流的好处会更高。
| 归档时间: |
|
| 查看次数: |
1971 次 |
| 最近记录: |