我希望有一个ThreadPoolExecutor我可以设置a corePoolSize和a的地方maximumPoolSize,会发生什么是队列会立即将任务交给线程池,从而创建新线程,直到它到达maximumPoolSize然后开始添加到队列.
有这样的事吗?如果没有,有没有什么好的理由没有这样的策略?
我真正想要的是将任务提交执行,当它达到一个点,在这一点上,由于拥有太多线程(通过设置maximumPoolSize),它实际上将获得"最差"的性能,它将停止添加新线程并使用它线程池并开始排队,然后如果队列已满,则拒绝.
当负载恢复时,它可以开始拆除未使用的线程返回corePoolSize.
这在我的应用程序中比在http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html中列出的"三个一般策略"更有意义.
注意:这些实现有些缺陷且不确定。在使用此代码之前,请阅读整个答案和注释。
如何创建一个工作队列,在执行程序低于最大池大小时拒绝项目,并在达到最大值后开始接受它们?
这依赖于记录的行为:
“如果请求无法排队,则会创建一个新线程,除非这超出了最大池大小,在这种情况下,任务将被拒绝。”
public class ExecutorTest
{
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
private static final int KEEP_ALIVE_TIME_MS = 5000;
public static void main(String[] args)
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue();
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);
workQueue.setExecutor(executor);
for (int i = 0; i < 6; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread());
}
});
}
}
public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private ThreadPoolExecutor executor;
public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean offer(Runnable e)
{
if (executor.getPoolSize() < executor.getMaximumPoolSize())
{
return false;
}
return super.offer(e);
}
}
}
Run Code Online (Sandbox Code Playgroud)
注意:您的问题让我感到惊讶,因为我预计您所需的行为是配置了 corePoolSize < MaximumPoolSize 的 ThreadPoolExecutor 的默认行为。但正如您所指出的,ThreadPoolExecutor 的 JavaDoc 明确指出了相反的情况。
想法#2
我想我有一个可能稍微好一点的方法。setCorePoolSize它依赖于编码到 中的方法中的副作用行为ThreadPoolExecutor。这个想法是在工作项排队时暂时有条件地增加核心池大小。当增加核心池大小时,ThreadPoolExecutor将立即产生足够的新线程来执行所有排队的(queue.size())任务。然后我们立即减小核心池的大小,这使得线程池在未来的低活动期间自然收缩。这种方法仍然不是完全确定的(例如,池大小可能会增长到最大池大小以上),但我认为几乎在所有情况下它都比第一种策略更好。
具体来说,我认为这种方法比第一种方法更好,因为:
-
public class ExecutorTest2
{
private static final int KEEP_ALIVE_TIME_MS = 5000;
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
public static void main(String[] args) throws InterruptedException
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE);
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);
workQueue.setExecutor(executor);
for (int i = 0; i < 60; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread()
+ " poolSize: " + executor.getPoolSize());
}
});
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private final int corePoolSize;
private final int maximumPoolSize;
private ThreadPoolExecutor executor;
public SaturateExecutorBlockingQueue(int corePoolSize,
int maximumPoolSize)
{
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
}
public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean offer(Runnable e)
{
if (super.offer(e) == false)
{
return false;
}
// Uncomment one or both of the below lines to increase
// the likelyhood of the threadpool reusing an existing thread
// vs. spawning a new one.
//Thread.yield();
//Thread.sleep(0);
int currentPoolSize = executor.getPoolSize();
if (currentPoolSize < maximumPoolSize
&& currentPoolSize >= corePoolSize)
{
executor.setCorePoolSize(currentPoolSize + 1);
executor.setCorePoolSize(corePoolSize);
}
return true;
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2180 次 |
| 最近记录: |