ExecutorService,避免任务队列过满的标准方法

man*_*ana 29 java concurrency

我正在使用ExecutorService以方便并发多线程程序.请参考以下代码:

while(xxx) {
    ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
    ...  
    Future<..> ... = exService.submit(..);
    ...
}
Run Code Online (Sandbox Code Playgroud)

在我的情况下,问题是submit()如果所有人NUMBER_THREADS都被占用则不会阻塞.结果是任务队列被许多任务淹没.这样做的结果是,关闭执行服务ExecutorService.shutdown()需要ExecutorService.isTerminated()很长时间(长时间都是假的).原因是任务队列仍然很满.

现在我的解决方法是使用信号量来禁止在任务队列中包含许多条目ExecutorService:

...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);

while(xxx) {
    ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
    ...
    semaphore.aquire();  
    // internally the task calls a finish callback, which invokes semaphore.release()
    // -> now another task is added to queue
    Future<..> ... = exService.submit(..); 
    ...
}
Run Code Online (Sandbox Code Playgroud)

我确信有更好的封装解决方案?

Ada*_*ent 27

诀窍是使用固定的队列大小和:

new ThreadPoolExecutor.CallerRunsPolicy()
Run Code Online (Sandbox Code Playgroud)

我还建议使用Guava的ListeningExecutorService.以下是消费者/生产者队列的示例.

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  5000L, TimeUnit.MILLISECONDS,
                                  new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
Run Code Online (Sandbox Code Playgroud)

更好的是你可能想要像RabbitMQ或ActiveMQ这样的MQ,因为他们有QoS技术.


Pet*_*rey 6

您可以调用ThreadPoolExecutor.getQueue().size()以查找等待队列的大小.如果队列太长,您可以采取措施.如果队列太长而不能减慢生产者的速度(如果合适的话),我建议在当前线程中运行任务.


Kev*_*vin 5

你最好自己创建ThreadPoolExecutor(这是Executors.newXXX()无论如何都要做的).

在构造函数中,您可以传入BlockingQueue以供Executor用作其任务队列.如果传入一个大小受限的BlockingQueue(如LinkedBlockingQueue),它应该达到你想要的效果.

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize));
Run Code Online (Sandbox Code Playgroud)

  • 我试了一下.不幸的是你的解决方案没有阻止(我想要)但抛出RejectedExecutionException.还发现:http://www.velocityreviews.com/forums/t389526-threadpoolexecutor-with-blocking-execute.html.所提出的解决方法似乎更复杂,因为我的信号量示例,该死的! (7认同)
  • 如果队列已满,由于RejectedExecutionException,这不起作用 (4认同)

mdm*_*dma 5

真正的阻塞ThreadPoolExecutor已出现在许多人的心愿单上,甚至还打开了一个JDC错误。我遇到了同样的问题,并且遇到了这个问题:http : //today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

它是BlockingThreadPoolExecutor的实现,使用RejectionPolicy实现,该RejectionPolicy使用offer将任务添加到队列中,等待队列中有空间。看起来不错。