Tav*_*nes 15 java concurrency multithreading java.util.concurrent fork-join
一个线程死锁饥饿如果池中的所有线程都在等待同一个池中排队的任务完成发生在一个正常的线程池. ForkJoinPool通过从join()调用内部窃取其他线程的工作来避免这个问题,而不是简单地等待.例如:
private static class ForkableTask extends RecursiveTask<Integer> {
private final CyclicBarrier barrier;
ForkableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
protected Integer compute() {
try {
barrier.await();
return 1;
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void testForkJoinPool() throws Exception {
final int parallelism = 4;
final ForkJoinPool pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
for (int i = 0; i < parallelism; ++i) {
forkableTasks.add(new ForkableTask(barrier));
}
int result = pool.invoke(new RecursiveTask<Integer>() {
@Override
protected Integer compute() {
for (ForkableTask task : forkableTasks) {
task.fork();
}
int result = 0;
for (ForkableTask task : forkableTasks) {
result += task.join();
}
return result;
}
});
assertThat(result, equalTo(parallelism));
}
Run Code Online (Sandbox Code Playgroud)
但是当使用ExecutorService接口到a时ForkJoinPool,工作窃取似乎不会发生.例如:
private static class CallableTask implements Callable<Integer> {
private final CyclicBarrier barrier;
CallableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public Integer call() throws Exception {
barrier.await();
return 1;
}
}
@Test
public void testWorkStealing() throws Exception {
final int parallelism = 4;
final ExecutorService pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
int result = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
// Deadlock in invokeAll(), rather than stealing work
for (Future<Integer> future : pool.invokeAll(callableTasks)) {
result += future.get();
}
return result;
}
}).get();
assertThat(result, equalTo(parallelism));
}
Run Code Online (Sandbox Code Playgroud)
从粗略看看ForkJoinPool的实现,所有常规ExecutorServiceAPI都是用ForkJoinTasks 实现的,所以我不确定为什么会发生死锁.
Phi*_*ler 30
你几乎回答了自己的问题.解决方案是" ForkJoinPool通过从join()调用内部窃取其他线程的工作来避免此问题"的声明.每当线程因某些其他原因而被阻塞时ForkJoinPool.join(),此工作就不会发生,并且线程只是等待并且什么都不做.
这样做的原因是,在Java中,不可能ForkJoinPool阻止其线程阻塞,而是为其提供其他工作.线程本身需要避免阻塞,而是要求池应该做的工作.这只在ForkJoinTask.join()方法中实现,而不是在任何其他阻塞方法中实现.如果你使用Future内部a ForkJoinPool,你也会看到饥饿僵局.
为什么工作窃取只ForkJoinTask.join()在Java API中实现,而不是在Java API中的任何其他阻塞方法中实现?好吧,有很多这样的阻塞方法(Object.wait(),Future.get()任何并发原语java.util.concurrent,I/O方法等),它们没有任何关系ForkJoinPool,这只是API中的一个任意类,所以向所有人添加特殊情况这些方法设计不好.它还可能导致非常令人惊讶和不希望的效果.想象一下,例如,一个用户将一个任务传递给一个ExecutorService等待a的任务Future,然后发现该任务挂起很长时间Future.get()只是因为正在运行的线程偷了一些其他(长时间运行的)工作项而不是等待Future并立即继续结果可用.一旦线程开始处理另一个任务,它就无法返回到原始任务,直到第二个任务完成.因此,其他阻止方法不会进行工作窃取实际上是一件好事.对于a ForkJoinTask,这个问题不存在,因为主要任务尽快继续并不重要,所有任务一起尽可能高效地处理是很重要的.
ForkJoinPool由于所有相关部分都不公开,因此也无法实现自己的方法来在内部进行工作窃取.
但是,实际上还有第二种方法可以防止饥饿死锁.这称为托管阻止.它不使用工作窃取(以避免上面提到的问题),但也需要阻塞的线程积极配合线程池.使用托管阻塞,线程告诉线程池在调用潜在阻塞方法之前可能会阻塞它,并在阻塞方法完成时通知池.然后线程池知道存在饥饿死锁的风险,并且如果其所有线程当前处于某些阻塞操作中并且还有其他任务要执行,则可能产生其他线程.请注意,由于额外线程的开销,这比工作窃取效率低.如果使用普通期货和托管阻塞实现递归并行算法而不是使用ForkJoinTask和工作窃取,则额外线程的数量会变得非常大(因为在算法的"除法"阶段,将创建并给出许多任务到立即阻塞并等待子任务结果的线程).但是,仍然会阻止饥饿死锁,并且它避免了任务必须等待很长时间的问题,因为它的线程同时开始处理另一个任务.
在ForkJoinPoolJava的支持托管阻塞.要使用它,需要实现接口ForkJoinPool.ManagedBlocker,以便从block该接口的方法中调用任务要执行的潜在阻塞方法.然后任务可能不会直接调用阻塞方法,而是需要调用静态方法ForkJoinPool.managedBlock(ManagedBlocker).此方法在阻塞之前和之后处理与线程池的通信.它也适用于当前任务未在a内执行ForkJoinPool,然后它只调用阻塞方法.
我在Java API(Java 7)中找到的唯一实际使用托管阻塞的地方是类Phaser.(这个类是一个同步障碍,如互斥锁和锁存器,但更灵活,更强大.)因此,与任务Phaser内部同步ForkJoinPool应该使用托管阻塞,并且可以避免饥饿死锁(但ForkJoinTask.join()仍然更可取,因为它使用工作窃取而不是托管阻塞) .无论您是ForkJoinPool直接使用还是通过其ExecutorService界面,这都有效.但是,如果您使用ExecutorService类所创建的其他任何内容,它将无法工作Executors,因为这些不支持托管阻止.
| 归档时间: |
|
| 查看次数: |
6393 次 |
| 最近记录: |