如何阻止BlockingQueue为空?

gd1*_*gd1 18 java concurrency blockingqueue

我正在寻找一种方法来阻止,直到BlockingQueue空.

我知道,在多线程环境中,只要有生产者将物品放入其中BlockingQueue,就会出现队列变空的情况,并且在几纳秒之后它就会充满物品.

但是,如果只有一个生产者,那么它可能希望等待(并阻止),直到队列在停止将项目放入队列后为空.

的Java /伪代码:

// Producer code
BlockingQueue queue = new BlockingQueue();

while (having some tasks to do) {
    queue.put(task);
}

queue.waitUntilEmpty(); // <-- how to do this?

print("Done");
Run Code Online (Sandbox Code Playgroud)

你有什么主意吗?

编辑:我知道包装BlockingQueue和使用额外的条件可以解决问题,我只是问是否有一些预先制定的解决方案和/或更好的替代品.

nic*_*are 18

使用wait()和的简单解决方案notify():

// Producer:
synchronized(queue) {
    while (!queue.isEmpty())
        queue.wait(); //wait for the queue to become empty
    queue.put();
}

//Consumer:
synchronized(queue) {
    queue.get();
    if (queue.isEmpty())
        queue.notify(); // notify the producer
}
Run Code Online (Sandbox Code Playgroud)

  • 这怎么样?假设消费者首先运行并在queue.get()上被阻塞,因为队列中还没有项目.现在生产者无法生产,因为它无法获得锁定. (4认同)
  • @liuyaodong不,读取`Object.wait()`的Javadoc.它释放锁,直到另一个线程调用`notify`.它甚至明确地说"当前线程必须拥有此对象的监视器". (3认同)

小智 7

我知道你可能已经有很多线程主动轮询或排队,但我仍觉得你的流程/设计不太正确.

队列变空并不意味着先前添加的任务已完成,某些项目可能需要很长时间才能处理,因此检查空白并不太有用.

所以你应该忘记的是BlockingQueue,你可以像任何其他收藏品一样使用它.翻译项目成CollectionsCallable,并利用的ExecutorService.invokeAll().

    Collection<Item> queue = ...
    Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>();

    for (Item item : queue) {
        tasks.add(new Callable<Result>() {

            @Override
            public Result call() throws Exception {
                // process the item ...

                return result;
            }
        });
    }

    // look at the results, add timeout for invokeAll if necessary
    List<Future<Result>> results = executorService.invokeAll(tasks);

    // done
Run Code Online (Sandbox Code Playgroud)

这种方法可以让您完全控制生产者等待的时间和正确的异常处理.