Eri*_*ric 12 java concurrency threadpoolexecutor
我的问题与此问题密切相关.正如在那里发布的那样,我希望主线程等到工作队列为空并且所有任务都已完成.然而,在我的情况下,问题是每个任务可以递归地导致提交新任务以进行处理.这使收集所有这些任务的未来变得有点尴尬.
我们当前的解决方案使用忙等待循环来等待终止:
do { //Wait until we are done the processing
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} while (!executor.getQueue().isEmpty()
|| numTasks.longValue() > executor.getCompletedTaskCount());
Run Code Online (Sandbox Code Playgroud)
numTasks是一个在创建每个新任务时增加的值.这有效但我认为由于忙碌的等待而不是很好.我想知道是否有一种好方法可以使主线程同步等待,直到被明确唤醒.
非常感谢你的所有建议!
最后,我选择了一些我认为相当简单的东西.我发现CountDownLatch几乎就是我所需要的.它会阻塞,直到计数器达到0.唯一的问题是它只能倒计时,而不是向上,因此在动态设置中不起作用,我可以在任务中提交新任务.因此,我实现了一个CountLatch提供附加功能的新类.(见下文)这个课我然后使用如下.
主线程调用latch.awaitZero(),阻塞直到锁存器达到0.
任何线索,调用之前executor.execute(..)调用latch.increment().
在完成之前的任何任务都会打电话latch.decrement().
当最后一个任务终止时,计数器将达到0,从而释放主线程.
欢迎提供进一步的建议和反馈!
public class CountLatch {
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected int acquireNonBlocking(int acquires) {
// increment count
for (;;) {
int c = getState();
int nextc = c + 1;
if (compareAndSetState(c, nextc))
return 1;
}
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountLatch(int count) {
this.sync = new Sync(count);
}
public void awaitZero() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void increment() {
sync.acquireNonBlocking(1);
}
public void decrement() {
sync.releaseShared(1);
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
Run Code Online (Sandbox Code Playgroud)
需要注意的是increment()/ decrement()呼叫可以被封装到一个自定义的Executor子类,因为有人建议,例如,由萨米尔霍宁,或beforeExecute与afterExecute作为被IMPL建议.看这里:
public class CountingThreadPoolExecutor extends ThreadPoolExecutor {
protected final CountLatch numRunningTasks = new CountLatch(0);
public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable command) {
numRunningTasks.increment();
super.execute(command);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
numRunningTasks.decrement();
super.afterExecute(r, t);
}
/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion() throws InterruptedException {
numRunningTasks.awaitZero();
}
/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
numRunningTasks.awaitZero(timeout, unit);
}
}
Run Code Online (Sandbox Code Playgroud)
小智 5
Java 7提供了一个适合这个名为Phaser的用例的同步器.它是CountDownLatch和CyclicBarrier的可重用混合体,可以增加和减少注册方的数量(类似于可递增的CountDownLatch).
在此方案中使用相位器的基本模式是在创建时使用移相器注册任务,并在完成时到达.当到达方的数量与登记的数量匹配时,相位器"前进"到下一阶段,在进行时通知任何等待线程.
这是我创建的等待递归任务完成的示例.为了演示目的,它天真地找到Fibonacci序列的前几个数字:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;
/**
* An example of using a Phaser to wait for the completion of recursive tasks.
* @author Voxelot
*/
public class PhaserExample {
/** Workstealing threadpool with reduced queue contention. */
private static ForkJoinPool executors;
/**
* @param args the command line arguments
*/
public static void main(String[] args) throws InterruptedException {
executors = new ForkJoinPool();
List<Long> sequence = new ArrayList<>();
for (int i = 0; i < 20; i++) {
sequence.add(fib(i));
}
System.out.println(sequence);
}
/**
* Computes the nth Fibonacci number in the Fibonacci sequence.
* @param n The index of the Fibonacci number to compute
* @return The computed Fibonacci number
*/
private static Long fib(int n) throws InterruptedException {
AtomicLong result = new AtomicLong();
//Flexible sychronization barrier
Phaser phaser = new Phaser();
//Base task
Task initialTask = new Task(n, result, phaser);
//Register fib(n) calling thread
phaser.register();
//Submit base task
executors.submit(initialTask);
//Make the calling thread arrive at the synchronization
//barrier and wait for all future tasks to arrive.
phaser.arriveAndAwaitAdvance();
//Get the result of the parallel computation.
return result.get();
}
private static class Task implements Runnable {
/** The Fibonacci sequence index of this task. */
private final int index;
/** The shared result of the computation. */
private final AtomicLong result;
/** The synchronizer. */
private final Phaser phaser;
public Task(int n, AtomicLong result, Phaser phaser) {
index = n;
this.result = result;
this.phaser = phaser;
//Inform synchronizer of additional work to complete.
phaser.register();
}
@Override
public void run() {
if (index == 1) {
result.incrementAndGet();
} else if (index > 1) {
//recurrence relation: Fn = Fn-1 + Fn-2
Task task1 = new Task(index - 1, result, phaser);
Task task2 = new Task(index - 2, result, phaser);
executors.submit(task1);
executors.submit(task2);
}
//Notify synchronizer of task completion.
phaser.arrive();
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
11680 次 |
| 最近记录: |