Chr*_*oph 32 java concurrency multithreading executorservice
我用a ExecutorService来执行任务.此任务可以递归地创建提交给它的其他任务,ExecutorService这些子任务也可以这样做.
我现在遇到的问题是,在我继续之前,我要等到所有任务完成(即所有任务都完成并且他们没有提交新任务).
我不能ExecutorService.shutdown()在主线程中调用,因为这可以防止新任务被接受ExecutorService.
ExecutorService.awaitTermination()如果shutdown没有被召唤,呼叫似乎什么都不做.
所以我有点卡在这里.ExecutorService要看到所有工人都闲着,这不是很难,是吗?我能想出的唯一不合理的解决方案是直接使用a ThreadPoolExecutor并getPoolSize()偶尔查询它.这样做真的没有更好的方法吗?
axt*_*avt 17
如果递归任务树中的任务数量最初是未知的,那么最简单的方法可能是实现您自己的同步原语,某种"逆信号量",并在您的任务中共享它.在提交每个任务之前,您需要递增一个值,当任务完成时,它会递减该值,并等到值为0.
将其实现为从任务显式调用的单独原语将此逻辑与线程池实现分离,并允许您将几个独立的递归任务树提交到同一个池中.
像这样的东西:
public class InverseSemaphore {
private int value = 0;
private Object lock = new Object();
public void beforeSubmit() {
synchronized(lock) {
value++;
}
}
public void taskCompleted() {
synchronized(lock) {
value--;
if (value == 0) lock.notifyAll();
}
}
public void awaitCompletion() throws InterruptedException {
synchronized(lock) {
while (value > 0) lock.wait();
}
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,taskCompleted()应在finally块内调用,以使其免受可能的异常的影响.
另请注意,beforeSubmit()应该在提交任务之前由提交线程调用,而不是由任务本身调用,以避免在旧任务完成且新的任务尚未启动时可能出现"错误完成".
编辑:修复了使用模式的重要问题.
Joh*_*int 17
这真的是Phaser的理想候选者.Java 7将推出这个新类.它灵活的CountdonwLatch/CyclicBarrier.您可以在JSR 166兴趣点获得稳定版本.
它是一种更灵活的CountdownLatch/CyclicBarrier的方式是因为它不仅能够支持未知数量的聚会(线程),而且还可以重复使用(这是相位部分进入的地方)
对于您提交的每项任务,您都会注册,当该任务完成后,您将到达.这可以递归地完成.
Phaser phaser = new Phaser();
ExecutorService e = //
Runnable recursiveRunnable = new Runnable(){
public void run(){
//do work recursively if you have to
if(shouldBeRecursive){
phaser.register();
e.submit(recursiveRunnable);
}
phaser.arrive();
}
}
public void doWork(){
int phase = phaser.getPhase();
phaser.register();
e.submit(recursiveRunnable);
phaser.awaitAdvance(phase);
}
Run Code Online (Sandbox Code Playgroud)
编辑:感谢@depthofreality指出前一个例子中的竞争条件.我正在更新它,以便执行线程只等待当前阶段的前进,因为它阻止递归函数完成.
在arrives == registers 的数量之前,阶段号不会跳闸.由于在每次递归调用之前调用register,所有调用完成时都会发生相位增量.
哇,你们快点:)
谢谢你提出的所有建议.期货不容易与我的模型集成,因为我不知道预先安排了多少runnables.因此,如果我将父任务保持活着只是为了等待它的递归子任务完成,我会有很多垃圾.
我使用AtomicInteger建议解决了我的问题.本质上,我将ThreadPoolExecutor子类化,并在调用execute()时递增计数器,并在调用afterExecute()时递减.当计数器变为0时,我调用shutdown().这似乎对我的问题有用,不确定这是否是一种通常很好的方法.特别是,我假设你只使用execute()来添加Runnables.
作为一个侧节点:我首先尝试检查afterExecute()队列中Runnables的数量以及当它们为0时活动和关闭的worker数量; 但这不起作用,因为并非所有Runnables都出现在队列中,并且getActiveCount()没有按照我的预期行事.
无论如何,这是我的解决方案:(如果有人发现此问题严重,请告诉我:)
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger executing = new AtomicInteger(0);
public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
TimeUnit seconds, BlockingQueue<Runnable> queue) {
super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
}
@Override
public void execute(Runnable command) {
//intercepting beforeExecute is too late!
//execute() is called in the parent thread before it terminates
executing.incrementAndGet();
super.execute(command);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
int count = executing.decrementAndGet();
if(count == 0) {
this.shutdown();
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10525 次 |
| 最近记录: |