Giu*_*lio 4 java parallel-processing concurrency forkjoinpool java-stream
使用 的最佳实践是什么.stream().parallel()?
例如,如果您有一堆阻塞 I/O 调用并且您想检查 if .anyMatch(...),那么并行执行此操作似乎是明智之举。
示例代码:
public boolean hasAnyRecentReference(JobId jobid) {
<...>
return pendingJobReferences.stream()
.parallel()
.anyMatch(pendingRef -> {
JobReference readReference = pendingRef.sync();
Duration referenceAge = timeService.timeSince(readReference.creationTime());
return referenceAge.lessThan(maxReferenceAge)
});
}
Run Code Online (Sandbox Code Playgroud)
乍一看这看起来很合理,因为我们可以同时执行多个阻塞读取,因为我们只关心匹配的任何一个,而不是一个接一个地检查(所以如果每次读取需要 50 毫秒,我们只需要等待 ( 50ms * expectedNumberOfNonRecentRefs ) / numThreads)。
在生产环境中引入此代码是否会对代码库的其他部分产生任何不可预见的性能影响?
编辑:正如@edharned 指出的,.parallel()现在使用CountedCompleter而不是调用.join(),这有其自身的问题,埃德在http://coopsoft.com/ar/Calamity2Article.htmlWhat is currently being done?部分下也解释了这一点。
我相信下面的信息对于理解为什么 fork-join 框架很棘手以及.parallel()在结论中提出的替代方案仍然有用。
虽然代码的精神是正确的,但实际代码可以对所有使用的代码产生系统范围的影响,.parallel()即使这根本不明显。
不久前,我发现了一篇建议不要这样做的文章:https : //dzone.com/articles/think-twice-using-java-8,但直到最近我才深入挖掘。
这些是我在阅读了一堆书后的想法:
.parallel()在 Java 中使用ForkJoinPool.commonPool()它是ForkJoinPool所有流共享的单例(ForkJoinPool.commonPool()是一个公共静态方法,所以理论上其他库/代码部分可以使用它)ForkJoinPool 实现工作窃取,除了共享队列外,还具有每线程队列
cached线程池不也进行工作窃取(即使有些参考文献称缓存线程池的工作共享)?事实证明,在使用空闲一词时,似乎有些术语含糊不清:
cached线程池中,线程只有在完成任务后才空闲。它并没有,如果它被阻止在等待一个阻塞调用变得无所事事在forkjoin线程池中,线程在完成其任务或调用.join()子任务上的方法(这是一个特殊的阻塞调用)时处于空闲状态。
当.join()在子任务上调用时,线程在等待该子任务完成时变为空闲。在空闲时,它会尝试执行任何其他可用的任务,即使它在另一个线程的队列中(它会窃取工作)。
[这是重要的一点]一旦它找到另一个要执行的任务,它必须在恢复其原始执行之前完成它,即使它正在等待的子任务完成而线程仍在执行被盗任务。
[这也很重要]这种窃取工作的行为仅适用于调用.join(). 如果一个线程在其他事情上被阻塞,比如 I/O,它就会空闲(即它不会窃取工作)。
Java 流不允许您提供自定义 ForkJoinPool 但https://github.com/amaembo/streamex可以
我花了一段时间才理解 的含义2.3.2,所以我将举一个简单的例子来帮助说明这个问题:
注意:这些是虚拟示例,但是您可以通过使用流在没有意识到的情况下进入等效情况,流在内部执行 fork join 操作。
此外,我将使用极其简化的伪代码,仅用于说明 .parallel() 问题,但在其他情况下不一定有意义。
假设我们正在实现归并排序
merge_sort(list):
left, right = split(list)
leftTask = mergeSortTask(left).fork()
rightTask = mergeSortTaks(right).fork()
return merge(leftTask.join(), rightTask.join())
Run Code Online (Sandbox Code Playgroud)
现在假设我们有另一段代码执行以下操作:
dummy_collect_results(queriesIds):
pending_results = []
for id in queriesIds:
pending_results += longBlockingIOTask(id).fork()
// do more stuff
Run Code Online (Sandbox Code Playgroud)
这里会发生什么?
当您编写合并排序代码时,您认为排序调用不执行任何 I/O,因此它们的性能应该是非常确定的,对吗?
对。您可能没有想到的是,由于该dummy_collect_results方法创建了一堆长时间运行和阻塞的子任务,当执行合并排序任务的线程阻塞在 上.join(),等待子任务完成时,它们可能会开始执行其中一个长时间阻塞的子任务。
这是不好的,因为如上所述,一旦长阻塞(在 I/O 上,而不是.join()调用,因此线程不会再次空闲)被盗,它必须完成,无论子任务是否线程正在等待通过.join()完成而阻塞 I/O。
这使得合并排序任务的执行不再具有确定性,因为执行这些任务的线程最终可能会窃取完全位于其他地方的代码生成的 I/O 密集型任务。
这也非常可怕且难以捕捉,因为您本可以在.parallel()整个代码库中使用而不会出现任何问题,而且所需要的只是一个类,它在使用时引入了长时间运行的任务,.parallel()并且突然之间代码库的所有其他部分可能获得不一致的性能。
所以我的结论是:
.parallel()如果您可以保证将要在代码中的任何位置创建的所有任务都很短,那就可以了.parallel()除非您知道,否则可能会对系统范围的性能产生不明显的影响(例如,如果您稍后添加一段使用.parallel()并执行长时间任务的代码,您可能会影响所有使用的代码的性能.parallel())2.您最好.parallel()完全避免使用ExecutorCompletionService或使用https://github.com/amaembo/streamex,它允许您提供自己的ForkJoinPool(允许更多隔离)。更好的是,您可以使用https://github.com/palantir/streams/blob/1.9.1/src/main/java/com/palantir/common/streams/MoreStreams.java#L53,它给你更好的 -对并发机制的粒度控制。| 归档时间: |
|
| 查看次数: |
1609 次 |
| 最近记录: |