为什么ThreadPoolExecutor会在keepAliveTime之后减少corePoolSize以下的线程?

and*_*att 9 java multithreading jdk1.6 keep-alive threadpool

我一直在使用ThreadPoolExecutor和JDK6来解决线程池的不同策略.我有一个优先级队列工作,但不知道我是否喜欢keepAliveTime之后池的大小(你得到的是无限队列).所以,我正在使用LinkedBlockingQueue和CallerRuns策略查看ThreadPoolExecutor.

我现在遇到的问题是,池正在升级,因为文档解释了它应该,但是在任务完成并且keepAliveTime发挥作用后,getPoolSize显示池减少到零.下面的示例代码可以让您看到我的问题的基础:

public class ThreadPoolingDemo {
    private final static Logger LOGGER =
         Logger.getLogger(ThreadPoolingDemo.class.getName());

    public static void main(String[] args) throws Exception {
        LOGGER.info("MAIN THREAD:starting");
        runCallerTestPlain();   
    }

    private static void runCallerTestPlain() throws InterruptedException {
        //10 core threads, 
        //50 max pool size, 
        //100 tasks in queue, 
        //at max pool and full queue - caller runs task
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50,
            5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100),
            new ThreadPoolExecutor.CallerRunsPolicy());

        //dump 5000 tasks on the queue
        for (int i = 0; i < 5000; i++) {
            tpe.submit(new Runnable() {
                @Override
                public void run() {
                    //just to eat some time and give a little feedback
                    for (int j = 0; j < 20; j++) {
                        LOGGER.info("First-batch Task, looping:" + j + "["
                               + Thread.currentThread().getId() + "]");
                    }
                }
            }, null);
        }
        LOGGER.info("MAIN THREAD:!!Done queueing!!");

        //check tpe statistics forever
        while (true) {
            LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: "
                 + tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize());
            Thread.sleep(1000);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我发现了一个似乎是这个问题的旧bug,但它已关闭:http: //bugs.sun.com/bugdatabase/view_bug.do?video_id = 6458662.这可能仍然存在于1.6或我错过了什么?

它看起来像我在这个橡皮鸭(http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html).我上面链接的错误与此相关:http : //bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792,其中的问题似乎在1.7中得到解决(我加载了1.7并经过验证 - 已修复. ..).我想我的主要问题是这个基本问题已经持续了近十年.我花了太多时间写这篇文章,现在不发布,希望它可以帮助别人.

Gra*_*ray 6

...任务完成后,keepAliveTime开始运行,getPoolSize显示池减少到零.

所以这看起来是一个竞争条件ThreadPoolExecutor.我想这是根据设计工作,虽然没有预料到.在getTask()工作线程循环以从阻塞队列中获取任务的方法中,您会看到以下代码:

if (state == SHUTDOWN)  // Help drain queue
    r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
    r = workQueue.take();
if (r != null)
    return r;
if (workerCanExit()) {
    if (runState >= SHUTDOWN) // Wake up others
        interruptIdleWorkers();
    return null;
}
Run Code Online (Sandbox Code Playgroud)

如果poolSize以上的增长corePoolSize那么如果出来后,投票时间keepAliveTime,代码下降到workerCanExit()rIS null.所有线程都可以true从该方法返回,因为它只是测试以下状态poolSize:

    mainLock.lock();
    boolean canExit;
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize)); << test poolSize here
    } finally {
        mainLock.unlock();                         << race to workerDone() begins
    }
Run Code Online (Sandbox Code Playgroud)

一旦返回true,然后工作线程退出,然后poolSize递减.如果所有工作线程同时进行该测试,那么它们将全部退出,因为在发生测试poolSize和停止工作之间的竞争--poolSize.

令我惊讶的是这种竞争条件的一致性.如果你sleep()run()下面的内部添加一些随机化,那么你可以得到一些核心线程不退出,但我认为竞争条件会更难以击中.


您可以在以下测试中看到此行为:

@Test
public void test() throws Exception {
    int before = Thread.activeCount();
    int core = 10;
    int max = 50;
    int queueSize = 100;
    ThreadPoolExecutor tpe =
            new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(queueSize),
                    new ThreadPoolExecutor.CallerRunsPolicy());
    tpe.allowCoreThreadTimeOut(false);
    assertEquals(0, tpe.getActiveCount());
    // if we start 1 more than can go into core or queue, poolSize goes to 0
    int startN = core + queueSize + 1;
    // if we only start jobs the core can take care of, then it won't go to 0
    // int startN = core + queueSize;
    for (int i = 0; i < startN; i++) {
        tpe.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    while (true) {
        System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize()
                + ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before));
        Thread.sleep(1000);
    }
}
Run Code Online (Sandbox Code Playgroud)

如果sleeprun()方法内部的行更改为以下内容:

private final Random random = new Random();
...
    Thread.sleep(100 + random.nextInt(100));
Run Code Online (Sandbox Code Playgroud)

这将使竞争条件更难以击中,因此一些核心线程仍将存在.