是否可以向ThreadPoolExecutor的BlockingQueue添加任务?

too*_*ear 18 java concurrency producer-consumer executorservice blockingqueue

ThreadPoolExecutor的JavaDoc 不清楚是否可以将任务直接添加到BlockingQueue执行程序的后台.文档称调用executor.getQueue()"主要用于调试和监视".

我正ThreadPoolExecutor用自己的方式构建一个BlockingQueue.我保留对队列的引用,以便我可以直接向其添加任务.返回相同的队列,getQueue()因此我假设admonition getQueue()适用于通过我的方式获取的对后备队列的引用.

代码的一般模式是:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();
Run Code Online (Sandbox Code Playgroud)

queue.offer() VS executor.execute()

据我了解,典型的用途是通过添加任务executor.execute().上面示例中的方法具有阻塞队列的优点,但execute()如果队列已满并立即失败并拒绝我的任务.我也喜欢提交作业与阻塞队列交互; 对我来说,这感觉更"纯粹"的生产者 - 消费者.

直接向队列添加任务的含义:我必须调用,prestartAllCoreThreads()否则没有工作线程正在运行.假设没有与执行程序的其他交互,则不会监视队列(检查ThreadPoolExecutor源确认了这一点).这也意味着直接排队ThreadPoolExecutor必须另外为> 0核心线程配置,并且不得配置为允许核心线程超时.

TL;博士

给定ThreadPoolExecutor配置如下:

  • 核心线程> 0
  • 核心线程不允许超时
  • 核心线程是预先启动的
  • 提及对BlockingQueue执行人的支持

将任务直接添加到队列而不是调用是否可以接受executor.execute()

有关

这个问题(生产者/消费者工作队列)是类似的,但没有具体涉及直接添加到队列.

jta*_*orn 11

一个技巧是实现ArrayBlockingQueue的自定义子类并覆盖offer()方法来调用阻塞版本,然后您仍然可以使用正常的代码路径.

queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};
Run Code Online (Sandbox Code Playgroud)

(正如你可能猜到的那样,我认为直接在队列中调用offer是正常的代码路径可能是一个坏主意).


Rob*_*ska 9

如果是我,我宁愿使用Executor#execute()Queue#offer(),仅仅是因为我使用的是一切从别的java.util.concurrent了.

你的问题很好,引起了我的兴趣,所以我看了一下这个来源ThreadPoolExecutor#execute():

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}
Run Code Online (Sandbox Code Playgroud)

我们可以看到执行本身调用offer()工作队列,但在必要之前不做一些漂亮,美味的池操作.出于这个原因,我认为使用它是明智的execute().不使用它可能(虽然我不确定)导致池以非最佳方式运行.但是,我不认为使用offer()破坏执行程序 - 看起来使用以下(也来自ThreadPoolExecutor)从队列中删除任务:

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这个getTask()方法只是在循环中调用,所以如果执行程序没有关闭,它就会阻塞,直到给队列一个新任务(无论它来自哪里).

注意:尽管我在这里发布了源代码片段,但我们不能依赖它们来获得明确的答案 - 我们应该只编写API.我们不知道实施execute()将如何随着时间而改变.