我正在努力实现我的处理管道的最佳方法.
我的制作人将作品提供给BlockingQueue.在消费者方面,我轮询队列,包装我在Runnable任务中获得的内容,并将其提交给ExecutorService.
while (!isStopping())
{
String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (work == null)
{
break;
}
executorService.execute(new Worker(work)); // needs to block if no threads!
}
Run Code Online (Sandbox Code Playgroud)
这不理想; 当然,ExecutorService有自己的队列,所以真正发生的事情是我总是完全耗尽我的工作队列并填充任务队列,随着任务的完成,队列会慢慢排空.
我意识到我可以在生产者端排队任务,但我真的不愿意这样做 - 我喜欢我的工作队列的间接/隔离是愚蠢的字符串; 它真的不是生产者的任何事情会发生在他们身上.迫使生产者对Runnable或Callable进行排队会破坏抽象,恕我直言.
但我确实希望共享工作队列代表当前的处理状态.如果消费者没有跟上,我希望能够阻止生产者.
我喜欢使用Executors,但我觉得我正在与他们的设计作斗争.我可以部分喝Kool-ade,还是我必须吞下它?我是否正在抵制排队任务?(我怀疑我可以设置ThreadPoolExecutor来使用1任务队列并覆盖它的执行方法来阻止而不是拒绝队列满,但这感觉很糟糕.)
建议?
java concurrency producer-consumer executorservice blockingqueue