执行程序:如果递归创建任务,如何同步等待所有任务完成?

Eri*_*ric 12 java concurrency threadpoolexecutor

我的问题与问题密切相关.正如在那里发布的那样,我希望主线程等到工作队列为空并且所有任务都已完成.然而,在我的情况下,问题是每个任务可以递归地导致提交新任务以进行处理.这使收集所有这些任务的未来变得有点尴尬.

我们当前的解决方案使用忙等待循环来等待终止:

        do { //Wait until we are done the processing
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    } while (!executor.getQueue().isEmpty()
             || numTasks.longValue() > executor.getCompletedTaskCount());
Run Code Online (Sandbox Code Playgroud)

numTasks是一个在创建每个新任务时增加的值.这有效但我认为由于忙碌的等待而不是很好.我想知道是否有一种好方法可以使主线程同步等待,直到被明确唤醒.

Eri*_*ric 6

非常感谢你的所有建议!

最后,我选择了一些我认为相当简单的东西.我发现CountDownLatch几乎就是我所需要的.它会阻塞,直到计数器达到0.唯一的问题是它只能倒计时,而不是向上,因此在动态设置中不起作用,我可以在任务中提交新任务.因此,我实现了一个CountLatch提供附加功能的新类.(见下文)这个课我然后使用如下.

主线程调用latch.awaitZero(),阻塞直到锁存器达到0.

任何线索,调用之前executor.execute(..)调用latch.increment().

在完成之前的任何任务都会打电话latch.decrement().

当最后一个任务终止时,计数器将达到0,从而释放主线程.

欢迎提供进一步的建议和反馈!

public class CountLatch {

@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected int acquireNonBlocking(int acquires) {
        // increment count
        for (;;) {
            int c = getState();
            int nextc = c + 1;
            if (compareAndSetState(c, nextc))
                return 1;
        }
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public CountLatch(int count) {
    this.sync = new Sync(count);
}

public void awaitZero() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void increment() {
    sync.acquireNonBlocking(1);
}

public void decrement() {
    sync.releaseShared(1);
}

public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

}
Run Code Online (Sandbox Code Playgroud)

需要注意的是increment()/ decrement()呼叫可以被封装到一个自定义的Executor子类,因为有人建议,例如,由萨米尔霍宁,或beforeExecuteafterExecute作为被IMPL建议.看这里:

public class CountingThreadPoolExecutor extends ThreadPoolExecutor {

protected final CountLatch numRunningTasks = new CountLatch(0);

public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
    numRunningTasks.increment();
    super.execute(command);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
    numRunningTasks.decrement();
    super.afterExecute(r, t);
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion() throws InterruptedException {
    numRunningTasks.awaitZero();
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
    numRunningTasks.awaitZero(timeout, unit);
}

}
Run Code Online (Sandbox Code Playgroud)


小智 5

Java 7提供了一个适合这个名为Phaser的用例的同步.它是CountDownLatch和CyclicBarrier的可重用混合体,可以增加和减少注册方的数量(类似于可递增的CountDownLatch).

在此方案中使用相位器的基本模式是在创建时使用移相器注册任务,并在完成时到达.当到达方的数量与登记的数量匹配时,相位器"前进"到下一阶段,在进行时通知任何等待线程.

这是我创建的等待递归任务完成的示例.为了演示目的,它天真地找到Fibonacci序列的前几个数字:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;

/**
 * An example of using a Phaser to wait for the completion of recursive tasks.
 * @author Voxelot
 */
public class PhaserExample {
    /** Workstealing threadpool with reduced queue contention. */
    private static ForkJoinPool executors;

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws InterruptedException {
        executors = new ForkJoinPool();
        List<Long> sequence = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            sequence.add(fib(i));
        }
        System.out.println(sequence);
    }

    /**
     * Computes the nth Fibonacci number in the Fibonacci sequence.
     * @param n The index of the Fibonacci number to compute
     * @return The computed Fibonacci number
     */
    private static Long fib(int n) throws InterruptedException {
        AtomicLong result = new AtomicLong();
        //Flexible sychronization barrier
        Phaser phaser = new Phaser();
        //Base task
        Task initialTask = new Task(n, result, phaser);
        //Register fib(n) calling thread
        phaser.register();
        //Submit base task
        executors.submit(initialTask);
        //Make the calling thread arrive at the synchronization
        //barrier and wait for all future tasks to arrive.
        phaser.arriveAndAwaitAdvance();
        //Get the result of the parallel computation.
        return result.get();
    }

    private static class Task implements Runnable {
        /** The Fibonacci sequence index of this task. */
        private final int index;
        /** The shared result of the computation. */
        private final AtomicLong result;
        /** The synchronizer. */
        private final Phaser phaser;

        public Task(int n, AtomicLong result, Phaser phaser) {
            index = n;
            this.result = result;
            this.phaser = phaser;
            //Inform synchronizer of additional work to complete.
            phaser.register();
        }

        @Override
        public void run() {
            if (index == 1) {
                result.incrementAndGet();
            } else if (index > 1) {
                //recurrence relation: Fn = Fn-1 + Fn-2
                Task task1 = new Task(index - 1, result, phaser);
                Task task2 = new Task(index - 2, result, phaser);
                executors.submit(task1);
                executors.submit(task2);
            }
            //Notify synchronizer of task completion.
            phaser.arrive();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)