Gra*_*ray 90 java multithreading blockingqueue threadpoolexecutor
我已经沮丧了一段时间,其默认行为ThreadPoolExecutor支持ExecutorService我们这么多人使用的线程池.引用Javadocs:
如果有多个corePoolSize但运行的maximumPoolSize线程少于maximumPoolSize,则只有在队列已满时才会创建新线程.
这意味着如果您使用以下代码定义线程池,它将永远不会启动第二个线程,因为它LinkedBlockingQueue是无限制的.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Run Code Online (Sandbox Code Playgroud)
只有当您有一个有界队列并且队列已满时才会启动核心编号之上的任何线程.我怀疑大量的初级Java多线程程序员并不知道这种行为ThreadPoolExecutor.
现在我有一个特定的用例,这不是最佳的.我正在寻找方法,而不是编写我自己的TPE课程来解决它.
我的要求是针对可能不可靠的第三方回拨的Web服务.
newFixedThreadPool(...)大量的线程,大多数都处于休眠状态.newCachedThreadPool().在更多线程启动之前,如何解决ThreadPoolExecutor队列需要限制和填充的限制?如何让它在排队任务之前启动更多线程?
编辑:
@Flavio提出了使用ThreadPoolExecutor.allowCoreThreadTimeOut(true)核心线程超时并退出的好处.我考虑过这一点,但我仍然想要核心线程功能.如果可能的话,我不希望池中的线程数降到核心大小以下.
Gra*_*ray 44
ThreadPoolExecutor在更多线程启动之前,如何解决队列需要限制和填充的限制问题.
我相信我终于找到了一个有点优雅(也许有点hacky)解决这个限制的方法ThreadPoolExecutor.它包括扩展LinkedBlockingQueue使其返回false的queue.offer(...)时候,已经有一些任务排队.如果当前线程没有跟上排队任务,TPE将添加其他线程.如果池已经处于最大线程,则将RejectedExecutionHandler调用该池.它是处理程序然后put(...)进入队列.
编写一个offer(...)可以返回false并put()永不阻塞的队列当然很奇怪,这就是hack部分.但这适用于TPE对队列的使用,所以我没有看到这样做有任何问题.
这是代码:
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
/*
* Offer it to the queue if there is 0 items already queued, else
* return false so the TPE will add another thread. If we return false
* and max threads have been reached then the RejectedExecutionHandler
* will be called which will do the put into the queue.
*/
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
/*
* This does the actual put into the queue. Once the max threads
* have been reached, the tasks will then queue up.
*/
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
Run Code Online (Sandbox Code Playgroud)
有了这个机制,当我向队列提交任务时,ThreadPoolExecutor意志:
offer(...)则返回false.RejectedExecutionHandlerRejectedExecutionHandler随后把任务分成由FIFO顺序第一个可用线程处理队列中.虽然在上面的示例代码中,队列是无限制的,但您也可以将其定义为有界队列.例如,如果您将容量添加到1000,LinkedBlockingQueue那么它将:
另外,如果你真的需要offer(...)在RejectedExecutionHandler那时使用,
你可以使用该offer(E, long, TimeUnit)方法而不是Long.MAX_VALUE超时.
编辑:
我ourQueue.setThreadPoolExecutor(tpe);根据@Ralf的反馈调整了我的方法覆盖.如果它们没有跟上负载,这只会扩大池中的线程数.
编辑:
对这个答案的另一个调整可能是实际询问TPE是否存在空闲线程并且仅在存在该项目时才将其排队.你必须为此创建一个真正的类并在其offer(...)上添加一个方法.
然后您的tpe.getPoolSize() == tpe.getMaximumPoolSize()方法可能类似于:
super.offer(...)在哪种情况下只是打电话tpe.getPoolSize() > tpe.getActiveCount().super.offer(...)然后调用,false因为似乎有空闲线程.volatilefork另一个线程.也许这个:
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
Run Code Online (Sandbox Code Playgroud)
请注意,TPE上的get方法很昂贵,因为它们访问getActiveCount()字段或(在这种情况下ThreadPoolExecutor)锁定TPE并遍历线程列表.此外,这里存在竞争条件,可能导致任务不正确地排队,或者在有空闲线程时分叉另一个线程.
RTS*_*lio 26
我已经在这个问题上得到了另外两个答案,但我怀疑这个答案是最好的.
它基于当前接受的答案的技术,即:
offer()方法(有时)返回false,ThreadPoolExecutor生成新线程或拒绝任务,并且RejectedExecutionHandler以实际排队上拒绝的任务.问题是什么时候offer()应该返回false.当队列上有几个任务时,当前接受的答案返回false,但正如我在那里的评论中所指出的,这会导致不良影响.或者,如果总是返回false,即使线程在队列中等待,您也将继续生成新线程.
解决方案是使用Java 7 LinkedTransferQueue并进行offer()调用tryTransfer().当有一个等待的消费者线程时,该任务将被传递给该线程.否则,offer()将返回false并且ThreadPoolExecutor将生成一个新线程.
BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
return tryTransfer(e);
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Run Code Online (Sandbox Code Playgroud)
Fla*_*vio 25
将核心大小和最大大小设置为相同的值,并允许从池中删除核心线程allowCoreThreadTimeOut(true).
注意:我现在更喜欢并推荐我的其他答案.
这是一个对我来说更简单的版本:每当执行新任务时增加corePoolSize(最大为maximumPoolSize的限制),然后每当a执行新任务时减少corePoolSize(降低到用户指定的"核心池大小"的限制)任务完成.
换句话说,跟踪正在运行或排队的任务的数量,并确保corePoolSize等于任务数量,只要它在用户指定的"核心池大小"和maximumPoolSize之间.
public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
private int userSpecifiedCorePoolSize;
private int taskCount;
public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
userSpecifiedCorePoolSize = corePoolSize;
}
@Override
public void execute(Runnable runnable) {
synchronized (this) {
taskCount++;
setCorePoolSizeToTaskCountWithinBounds();
}
super.execute(runnable);
}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
synchronized (this) {
taskCount--;
setCorePoolSizeToTaskCountWithinBounds();
}
}
private void setCorePoolSizeToTaskCountWithinBounds() {
int threads = taskCount;
if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
setCorePoolSize(threads);
}
}
Run Code Online (Sandbox Code Playgroud)
如上所述,该类不支持在构造后更改用户指定的corePoolSize或maximumPoolSize,并且不支持直接或通过remove()或操作工作队列purge().
我们有一个子类ThreadPoolExecutor,需要额外的creationThreshold和覆盖execute.
public void execute(Runnable command) {
super.execute(command);
final int poolSize = getPoolSize();
if (poolSize < getMaximumPoolSize()) {
if (getQueue().size() > creationThreshold) {
synchronized (this) {
setCorePoolSize(poolSize + 1);
setCorePoolSize(poolSize);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
也许这也有帮助,但你的当然看起来更有艺术气息......