灵活的CountDownLatch?

Ada*_*ski 24 java concurrency multithreading countdownlatch phaser

我现在遇到了两次问题,即生产者线程生成N个工作项,将它们提交给a ExecutorService,然后需要等到所有N个项都被处理完毕.

注意事项

  • N事先不知道.如果是这样的话,我只需创建一个CountDownLatch然后拥有生产者线程,await()直到所有工作完成.
  • 使用a CompletionService是不合适的,因为虽然我的生产者线程需要阻塞(即通过调用take()),但是没有办法表明所有工作都已完成,导致生产者线程停止等待.

我目前最喜欢的解决方案是使用整数计数器,并在提交工作项时递增它,并在处理工作项时递减它.在所有N个任务的提交之后,我的生产者线程将需要等待锁定,检查是否counter == 0通知它.如果消费者线程已经递减计数器并且新值为0,则消费者线程将需要通知生产者.

有没有更好的方法解决这个问题,或者java.util.concurrent我应该使用合适的构造而不是"滚动自己的"?

提前致谢.

Joh*_*int 28

java.util.concurrent.Phaser看起来它适合你.计划在Java 7中发布,但最稳定的版本可以在jsr166的兴趣小组网站上找到.

移相器是一个美化的循环屏障.您可以注册N个参与方,并在准备好等待特定阶段的预付款时.

一个关于它如何工作的简单示例:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}
Run Code Online (Sandbox Code Playgroud)

  • 很酷 - 谢谢; 这看起来就像我正在追求的......不敢相信他们称之为Phaser. (3认同)
  • 希望我能再次投票x100.非常感谢. (2认同)
  • 如果`getRunnable()`的意思是表示只发信号通知相位器然后结束的任务,你想调用`phaser.arriveAndDeregister()`和**而不是**'phaser.arrive()`,否则当父任务再次调用`phaser.arriveAndAwaitAdvance()`它将死锁,因为任务将等待仍在相位器中注册的已完成任务. (2认同)