Java ExecutorService:awaitTermination所有递归创建的任务

Chr*_*oph 32 java concurrency multithreading executorservice

我用a ExecutorService来执行任务.此任务可以递归地创建提交给它的其他任务,ExecutorService这些子任务也可以这样做.

我现在遇到的问题是,在我继续之前,我要等到所有任务完成(即所有任务都完成并且他们没有提交新任务).

我不能ExecutorService.shutdown()在主线程中调用,因为这可以防止新任务被接受ExecutorService.

ExecutorService.awaitTermination()如果shutdown没有被召唤,呼叫似乎什么都不做.

所以我有点卡在这里.ExecutorService要看到所有工人都闲着,这不是很难,是吗?我能想出的唯一不合理的解决方案是直接使用a ThreadPoolExecutorgetPoolSize()偶尔查询它.这样做真的没有更好的方法吗?

axt*_*avt 17

如果递归任务树中的任务数量最初是未知的,那么最简单的方法可能是实现您自己的同步原语,某种"逆信号量",并在您的任务中共享它.在提交每个任务之前,您需要递增一个值,当任务完成时,它会递减该值,并等到值为0.

将其实现为从任务显式调用的单独原语将此逻辑与线程池实现分离,并允许您将几个独立的递归任务树提交到同一个池中.

像这样的东西:

public class InverseSemaphore {
    private int value = 0;
    private Object lock = new Object();

    public void beforeSubmit() {
        synchronized(lock) {
            value++;
        }
    }

    public void taskCompleted() {
        synchronized(lock) {
            value--;
            if (value == 0) lock.notifyAll();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        synchronized(lock) {
            while (value > 0) lock.wait();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,taskCompleted()应在finally块内调用,以使其免受可能的异常的影响.

另请注意,beforeSubmit()应该在提交任务之前由提交线程调用,而不是由任务本身调用,以避免在旧任务完成且新的任务尚未启动时可能出现"错误完成".

编辑:修复了使用模式的重要问题.


Joh*_*int 17

这真的是Phaser的理想候选者.Java 7将推出这个新类.它灵活的CountdonwLatch/CyclicBarrier.您可以在JSR 166兴趣点获得稳定版本.

它是一种更灵活的CountdownLatch/CyclicBarrier的方式是因为它不仅能够支持未知数量的聚会(线程),而且还可以重复使用(这是相位部分进入的地方)

对于您提交的每项任务,您都会注册,当该任务完成后,您将到达.这可以递归地完成.

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}
Run Code Online (Sandbox Code Playgroud)

编辑:感谢@depthofreality指出前一个例子中的竞争条件.我正在更新它,以便执行线程只等待当前阶段的前进,因为它阻止递归函数完成.

arrives == registers 的数量之前,阶段号不会跳闸.由于在每次递归调用之前调用register,所有调用完成时都会发生相位增量.


Chr*_*oph 6

哇,你们快点:)

谢谢你提出的所有建议.期货不容易与我的模型集成,因为我不知道预先安排了多少runnables.因此,如果我将父任务保持活着只是为了等待它的递归子任务完成,我会有很多垃圾.

我使用AtomicInteger建议解决了我的问题.本质上,我将ThreadPoolExecutor子类化,并在调用execute()时递增计数器,并在调用afterExecute()时递减.当计数器变为0时,我调用shutdown().这似乎对我的问题有用,不确定这是否是一种通常很好的方法.特别是,我假设你只使用execute()来添加Runnables.

作为一个侧节点:我首先尝试检查afterExecute()队列中Runnables的数量以及当它们为0时活动和关闭的worker数量; 但这不起作用,因为并非所有Runnables都出现在队列中,并且getActiveCount()没有按照我的预期行事.

无论如何,这是我的解决方案:(如果有人发现此问题严重,请告诉我:)

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger executing = new AtomicInteger(0);

    public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        //intercepting beforeExecute is too late!
        //execute() is called in the parent thread before it terminates
        executing.incrementAndGet();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        int count = executing.decrementAndGet();
        if(count == 0) {
            this.shutdown();
        }
    }

}
Run Code Online (Sandbox Code Playgroud)