too*_*ear 18 java concurrency producer-consumer executorservice blockingqueue
ThreadPoolExecutor的JavaDoc 不清楚是否可以将任务直接添加到BlockingQueue
执行程序的后台.文档称调用executor.getQueue()
"主要用于调试和监视".
我正ThreadPoolExecutor
用自己的方式构建一个BlockingQueue
.我保留对队列的引用,以便我可以直接向其添加任务.返回相同的队列,getQueue()
因此我假设admonition getQueue()
适用于通过我的方式获取的对后备队列的引用.
代码的一般模式是:
int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
Runnable job = ...;
queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
try {
Thread.sleep(...);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
executor.shutdownNow();
Run Code Online (Sandbox Code Playgroud)
queue.offer()
VS executor.execute()
据我了解,典型的用途是通过添加任务executor.execute()
.上面示例中的方法具有阻塞队列的优点,但execute()
如果队列已满并立即失败并拒绝我的任务.我也喜欢提交作业与阻塞队列交互; 对我来说,这感觉更"纯粹"的生产者 - 消费者.
直接向队列添加任务的含义:我必须调用,prestartAllCoreThreads()
否则没有工作线程正在运行.假设没有与执行程序的其他交互,则不会监视队列(检查ThreadPoolExecutor
源确认了这一点).这也意味着直接排队ThreadPoolExecutor
必须另外为> 0核心线程配置,并且不得配置为允许核心线程超时.
给定ThreadPoolExecutor
配置如下:
BlockingQueue
执行人的支持将任务直接添加到队列而不是调用是否可以接受executor.execute()
?
这个问题(生产者/消费者工作队列)是类似的,但没有具体涉及直接添加到队列.
jta*_*orn 11
一个技巧是实现ArrayBlockingQueue的自定义子类并覆盖offer()方法来调用阻塞版本,然后您仍然可以使用正常的代码路径.
queue = new ArrayBlockingQueue<Runnable>(queueSize) {
@Override public boolean offer(Runnable runnable) {
try {
return offer(runnable, 1, TimeUnit.HOURS);
} catch(InterruptedException e) {
// return interrupt status to caller
Thread.currentThread().interrupt();
}
return false;
}
};
Run Code Online (Sandbox Code Playgroud)
(正如你可能猜到的那样,我认为直接在队列中调用offer是正常的代码路径可能是一个坏主意).
如果是我,我宁愿使用Executor#execute()
过Queue#offer()
,仅仅是因为我使用的是一切从别的java.util.concurrent
了.
你的问题很好,引起了我的兴趣,所以我看了一下这个来源ThreadPoolExecutor#execute()
:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
Run Code Online (Sandbox Code Playgroud)
我们可以看到执行本身调用offer()
工作队列,但在必要之前不做一些漂亮,美味的池操作.出于这个原因,我认为使用它是明智的execute()
.不使用它可能(虽然我不确定)导致池以非最佳方式运行.但是,我不认为使用offer()
会破坏执行程序 - 看起来使用以下(也来自ThreadPoolExecutor)从队列中删除任务:
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
Run Code Online (Sandbox Code Playgroud)
这个getTask()
方法只是在循环中调用,所以如果执行程序没有关闭,它就会阻塞,直到给队列一个新任务(无论它来自哪里).
注意:尽管我在这里发布了源代码片段,但我们不能依赖它们来获得明确的答案 - 我们应该只编写API.我们不知道实施execute()
将如何随着时间而改变.