and*_*att 9 java multithreading jdk1.6 keep-alive threadpool
我一直在使用ThreadPoolExecutor和JDK6来解决线程池的不同策略.我有一个优先级队列工作,但不知道我是否喜欢keepAliveTime之后池的大小(你得到的是无限队列).所以,我正在使用LinkedBlockingQueue和CallerRuns策略查看ThreadPoolExecutor.
我现在遇到的问题是,池正在升级,因为文档解释了它应该,但是在任务完成并且keepAliveTime发挥作用后,getPoolSize显示池减少到零.下面的示例代码可以让您看到我的问题的基础:
public class ThreadPoolingDemo {
private final static Logger LOGGER =
Logger.getLogger(ThreadPoolingDemo.class.getName());
public static void main(String[] args) throws Exception {
LOGGER.info("MAIN THREAD:starting");
runCallerTestPlain();
}
private static void runCallerTestPlain() throws InterruptedException {
//10 core threads,
//50 max pool size,
//100 tasks in queue,
//at max pool and full queue - caller runs task
ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100),
new ThreadPoolExecutor.CallerRunsPolicy());
//dump 5000 tasks on the queue
for (int i = 0; i < 5000; i++) {
tpe.submit(new Runnable() {
@Override
public void run() {
//just to eat some time and give a little feedback
for (int j = 0; j < 20; j++) {
LOGGER.info("First-batch Task, looping:" + j + "["
+ Thread.currentThread().getId() + "]");
}
}
}, null);
}
LOGGER.info("MAIN THREAD:!!Done queueing!!");
//check tpe statistics forever
while (true) {
LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: "
+ tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize());
Thread.sleep(1000);
}
}
}
Run Code Online (Sandbox Code Playgroud)
我发现了一个似乎是这个问题的旧bug,但它已关闭:http: //bugs.sun.com/bugdatabase/view_bug.do?video_id = 6458662.这可能仍然存在于1.6或我错过了什么?
它看起来像我在这个橡皮鸭(http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html).我上面链接的错误与此相关:http : //bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792,其中的问题似乎在1.7中得到解决(我加载了1.7并经过验证 - 已修复. ..).我想我的主要问题是这个基本问题已经持续了近十年.我花了太多时间写这篇文章,现在不发布,希望它可以帮助别人.
...任务完成后,keepAliveTime开始运行,getPoolSize显示池减少到零.
所以这看起来是一个竞争条件ThreadPoolExecutor
.我想这是根据设计工作,虽然没有预料到.在getTask()
工作线程循环以从阻塞队列中获取任务的方法中,您会看到以下代码:
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
Run Code Online (Sandbox Code Playgroud)
如果poolSize
以上的增长corePoolSize
那么如果出来后,投票时间keepAliveTime
,代码下降到workerCanExit()
自r
IS null
.所有线程都可以true
从该方法返回,因为它只是测试以下状态poolSize
:
mainLock.lock();
boolean canExit;
try {
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize)); << test poolSize here
} finally {
mainLock.unlock(); << race to workerDone() begins
}
Run Code Online (Sandbox Code Playgroud)
一旦返回true
,然后工作线程退出,然后将poolSize
递减.如果所有工作线程同时进行该测试,那么它们将全部退出,因为在发生测试poolSize
和停止工作之间的竞争--poolSize
.
令我惊讶的是这种竞争条件的一致性.如果你sleep()
在run()
下面的内部添加一些随机化,那么你可以得到一些核心线程不退出,但我认为竞争条件会更难以击中.
您可以在以下测试中看到此行为:
@Test
public void test() throws Exception {
int before = Thread.activeCount();
int core = 10;
int max = 50;
int queueSize = 100;
ThreadPoolExecutor tpe =
new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
tpe.allowCoreThreadTimeOut(false);
assertEquals(0, tpe.getActiveCount());
// if we start 1 more than can go into core or queue, poolSize goes to 0
int startN = core + queueSize + 1;
// if we only start jobs the core can take care of, then it won't go to 0
// int startN = core + queueSize;
for (int i = 0; i < startN; i++) {
tpe.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
while (true) {
System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize()
+ ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before));
Thread.sleep(1000);
}
}
Run Code Online (Sandbox Code Playgroud)
如果sleep
将run()
方法内部的行更改为以下内容:
private final Random random = new Random();
...
Thread.sleep(100 + random.nextInt(100));
Run Code Online (Sandbox Code Playgroud)
这将使竞争条件更难以击中,因此一些核心线程仍将存在.