具有停止条件的Java生产者 - 消费者

mar*_*ssi 5 java concurrency producer-consumer

我有N个工作人员共享要计算的元素队列.在每次迭代中,每个工作程序从队列中删除一个元素,并且可以生成更多要计算的元素,这些元素将被放在同一个队列中.基本上,每个生产者也是消费者.当队列中没有元素并且所有工作者已经完成计算当前元素时,计算结束(因此不能再生成要计算的元素).我想避免调度员/协调员,所以工人应该协调.允许工人查明暂停条件是否有效的最佳模式是什么,因此代表其他人停止计算?

例如,如果所有线程都只执行此循环,那么当所有元素都被计算出来时,它将导致所有线程被永久阻塞:

while (true) {
    element = queue.poll();
    newElements[] = compute(element);
    if (newElements.length > 0) {
        queue.addAll(newElements);
    }
}
Run Code Online (Sandbox Code Playgroud)

Zim*_*oot 6

保持活动线程的数量.

public class ThreadCounter {
    public static final AtomicInteger threadCounter = new AtomicInteger(N);
    public static final AtomicInteger queueCounter = new AtomicInteger(0);
    public static final Object poisonPill = new Object();
    public static volatile boolean cancel = false; // or use a final AomticBoolean instead
}
Run Code Online (Sandbox Code Playgroud)

您的线程的轮询循环应如下所示(我假设您正在使用BlockingQueue)

while(!ThreadCounter.cancel) {
    int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking
    if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) {
        ThreadCounter.cancel = true;
        queue.offer(ThreadCounter.poisonPill);
    } else {
        Object obj = queue.take();
        ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking
        ThreadCounter.queueCounter.decrementAndGet();
        if(obj == ThreadCounter.poisonPill) {
            queue.offer(obj); // send the poison pill back through the queue so the other threads can read it
            continue;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

如果线程即将阻塞,BlockingQueue那么它会递减计数器; 如果所有线程都在等待队列(意味着counter == 0),则最后一个线程设置cancel为true,然后通过队列发送毒丸以唤醒其他线程; 每个线程看到毒丸,通过队列发回它以唤醒剩余的线程,然后当它看到cancel设置为true 时退出循环.

编辑:我通过添加一个queueCounter维护队列中对象数量的计数来删除数据竞争(显然,您还需要queueCounter.incrementAndGet()在将对象添加到队列的任何位置添加调用).这样做如下:if threadCount == 0,但是queueCount != 0,这意味着一个线程刚刚从队列中删除了一个项目但尚未调用threadCount.getAndIncrement,因此cancel变量设置为true.threadCount.getAndIncrement调用在调用之前是很重要的queueCount.getAndDecrement,否则你仍然会有数据竞争.你调用的顺序并不重要,queueCount.getAndIncrement因为你不会将它与调用交错threadCount.getAndDecrement(后者将在循环结束时调用,前者将在循环开始时调用).

请注意,您不能仅使用a queueCount来确定何时结束进程,因为线程可能仍然处于活动状态而尚未在队列中放置任何数据 - 换句话说,queueCount将为零,但是一旦该线程将为非零线程完成了当前的迭代.

poisonPill您可以改为通过队列发送取消线程(N-1),而不是通过队列重复发送poisonPills.如果您使用不同的队列使用此方法,请务必谨慎,因为某些队列(例如亚马逊的简单队列服务)可能会返回与其take方法等效的多个项目,在这种情况下,您需要重复发送poisonPill以确保一切关闭.

此外,while(!cancel)您可以使用while(true)循环并在循环检测到时断开,而不是使用循环poisonPill