生产者/消费者工作队列

Jol*_*ger 10 java concurrency producer-consumer executorservice blockingqueue

我正在努力实现我的处理管道的最佳方法.

我的制作人将作品提供给BlockingQueue.在消费者方面,我轮询队列,包装我在Runnable任务中获得的内容,并将其提交给ExecutorService.

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}
Run Code Online (Sandbox Code Playgroud)

这不理想; 当然,ExecutorService有自己的队列,所以真正发生的事情是我总是完全耗尽我的工作队列并填充任务队列,随着任务的完成,队列会慢慢排空.

我意识到我可以在生产者端排队任务,但我真的不愿意这样做 - 我喜欢我的工作队列的间接/隔离是愚蠢的字符串; 它真的不是生产者的任何事情会发生在他们身上.迫使生产者对Runnable或Callable进行排队会破坏抽象,恕我直言.

但我确实希望共享工作队列代表当前的处理状态.如果消费者没有跟上,我希望能够阻止生产者.

我喜欢使用Executors,但我觉得我正在与他们的设计作斗争.我可以部分喝Kool-ade,还是我必须吞下它?我是否正在抵制排队任务?(我怀疑我可以设置ThreadPoolExecutor来使用1任务队列并覆盖它的执行方法来阻止而不是拒绝队列满,但这感觉很糟糕.)

建议?

Kev*_*vin 19

我希望共享工作队列代表当前的处理状态.

尝试使用共享的BlockingQueue 并使用一个工作线程池来从队列中取出工作项.

如果消费者没有跟上,我希望能够阻止生产者.

无论ArrayBlockingQueue的LinkedBlockingQueue支持有界队列,使得它们在放置块满时.使用blocking put()方法可确保在队列已满时阻止生产者.

这是一个艰难的开始.您可以调整工作人员数和队列大小:

public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)