Chr*_*ies 21 java parallel-processing concurrency java-8 java-stream
考虑以下情况:我们使用Java 8并行流来执行并行forEach循环,例如,
IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})
Run Code Online (Sandbox Code Playgroud)
并行线程的数量由系统属性"java.util.concurrent.ForkJoinPool.common.parallelism"控制,通常等于处理器的数量.
现在假设我们想限制特定工作的并行执行次数 - 例如因为该部分是内存密集型而内存约束意味着并行执行的限制.
限制并行执行的一种明显而优雅的方法是使用信号量(这里建议),例如,下面的代码片段将并行执行的数量限制为5:
final Semaphore concurrentExecutions = new Semaphore(5);
IntStream.range(0,20).parallel().forEach(i -> {
concurrentExecutions.acquireUninterruptibly();
try {
/* WORK DONE HERE */
}
finally {
concurrentExecutions.release();
}
});
Run Code Online (Sandbox Code Playgroud)
这很好用!
但是:在worker(at /* WORK DONE HERE */
)中使用任何其他并行流可能会导致死锁.
对我来说,这是一个意外的行为.
说明:由于Java流使用ForkJoin池,因此内部forEach正在分叉,并且连接似乎正在等待.但是,这种行为仍然是出乎意料的.请注意,如果设置"java.util.concurrent.ForkJoinPool.common.parallelism"
为1 ,并行流甚至可以工作.
另请注意,如果存在内部并行forEach,则它可能不透明.
问题: 这种行为是否符合Java 8规范(在这种情况下,它意味着禁止在并行流工作者中使用信号量)或者这是一个错误?
为方便起见:下面是一个完整的测试用例.除了"true,true"之外,两个布尔值的任何组合都有效,这会导致死锁.
澄清:为了明确这一点,让我强调一个方面:acquire
信号量不会发生死锁.请注意,代码包含
如果该段代码使用另一个并行流,则死锁发生在2. 然后在OTHER流内发生死锁.因此,似乎不允许一起使用嵌套并行流和阻塞操作(如信号量)!
请注意,记录并行流使用ForkJoinPool并且ForkJoinPool和Semaphore属于同一个包 - java.util.concurrent
(因此可以预期它们可以很好地互操作).
/*
* (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christian-fries.de.
*
* Created on 03.05.2014
*/
package net.finmath.experiments.concurrency;
import java.util.concurrent.Semaphore;
import java.util.stream.IntStream;
/**
* This is a test of Java 8 parallel streams.
*
* The idea behind this code is that the Semaphore concurrentExecutions
* should limit the parallel executions of the outer forEach (which is an
* <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
* the parallel executions of the outer forEach should be limited due to a
* memory constrain).
*
* Inside the execution block of the outer forEach we use another parallel stream
* to create an inner forEach. The number of concurrent
* executions of the inner forEach is not limited by us (it is however limited by a
* system property "java.util.concurrent.ForkJoinPool.common.parallelism").
*
* Problem: If the semaphore is used AND the inner forEach is active, then
* the execution will be DEADLOCKED.
*
* Note: A practical application is the implementation of the parallel
* LevenbergMarquardt optimizer in
* {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
* In one application the number of tasks in the outer and inner loop is very large (>1000)
* and due to memory limitation the outer loop should be limited to a small (5) number
* of concurrent executions.
*
* @author Christian Fries
*/
public class ForkJoinPoolTest {
public static void main(String[] args) {
// Any combination of the booleans works, except (true,true)
final boolean isUseSemaphore = true;
final boolean isUseInnerStream = true;
final int numberOfTasksInOuterLoop = 20; // In real applications this can be a large number (e.g. > 1000).
final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
final int concurrentExecusionsLimitInOuterLoop = 5;
final int concurrentExecutionsLimitForStreams = 10;
final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));
IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
if(isUseSemaphore) {
concurrentExecutions.acquireUninterruptibly();
}
try {
System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread());
if(isUseInnerStream) {
runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
}
else {
try {
Thread.sleep(10*numberOfTasksInInnerLoop);
} catch (Exception e) {
}
}
}
finally {
if(isUseSemaphore) {
concurrentExecutions.release();
}
}
});
System.out.println("D O N E");
}
/**
* Runs code in a parallel forEach using streams.
*
* @param numberOfTasksInInnerLoop Number of tasks to execute.
*/
private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
try {
Thread.sleep(10);
} catch (Exception e) {
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
Bri*_*etz 48
每当您将问题分解为任务时,这些任务可以在其他任务上被阻止,并尝试在有限的线程池中执行它们,您就有可能遇到池引发的死锁.请参见练习 8.1中的Java并发.
这无疑是一个错误 - 在你的代码中.您正在使用将阻止等待同一池中其他任务的结果的任务来填充FJ池.有时候你会很幸运,事情也不会陷入僵局(就像并非所有的锁定错误都会导致死锁一直存在),但从根本上说,你在这里非常薄弱的滑冰.
我在探查器 (VisualVM) 中运行了您的测试,我同意:线程正在等待信号量并等待 F/J 池中的 aWaitJoin()。
\n\n该框架在 join() 方面存在严重问题。我\xe2\x80\x99 四年来一直在写一篇关于这个框架的评论。基本连接问题从这里开始。
\n\naWaitJoin() 也有类似的问题。您可以自己仔细阅读代码。当框架到达工作双端队列的底部时,它会发出 wait()。归根结底是这个框架无法进行上下文切换。
\n\n有一种方法可以让这个框架为停滞的线程创建补偿线程。您需要实现 ForkJoinPool.ManagedBlocker 接口。你怎么能做到这一点,我不知道。您\xe2\x80\x99正在运行带有流的基本API。您\xe2\x80\x99 没有实现 Streams API 并编写自己的代码。
\n\n我坚持上面的评论:一旦将并行性移交给 API,您就放弃了控制并行机制内部工作的能力。API 不存在任何错误(除了使用错误的并行操作框架之外)。问题在于信号量或任何其他用于控制 API 内并行性的方法都是危险的想法。
\n 归档时间: |
|
查看次数: |
5721 次 |
最近记录: |