ThreadPoolExecutor的getActiveCount()

ina*_*lus 5 java singleton spring multithreading threadpool

我有一个ThreadPoolExecutor,当我调用getActiveCount()时,它似乎对我说谎.我没有做过很多多线程编程,所以也许我做错了.

这是我的TPE

@Override
public void afterPropertiesSet() throws Exception {

    BlockingQueue<Runnable> workQueue;
    int maxQueueLength = threadPoolConfiguration.getMaximumQueueLength();
    if (maxQueueLength == 0) {
        workQueue = new LinkedBlockingQueue<Runnable>();
    } else {
        workQueue = new LinkedBlockingQueue<Runnable>(maxQueueLength);
    }

    pool = new ThreadPoolExecutor(
                   threadPoolConfiguration.getCorePoolSize(),
                   threadPoolConfiguration.getMaximumPoolSize(),
                   threadPoolConfiguration.getKeepAliveTime(),
                   TimeUnit.valueOf(threadPoolConfiguration.getTimeUnit()),
                   workQueue,
                   // Default thread factory creates normal-priority,
                   // non-daemon threads.
                   Executors.defaultThreadFactory(),
                   // Run any rejected task directly in the calling thread.
                   // In this way no records will be lost due to rejection
                   // however, no records will be added to the workQueue
                   // while the calling thread is processing a Task, so set
                   // your queue-size appropriately.
                   //
                   // This also means MaxThreadCount+1 tasks may run
                   // concurrently. If you REALLY want a max of MaxThreadCount
                   // threads don't use this.
                   new ThreadPoolExecutor.CallerRunsPolicy());
}
Run Code Online (Sandbox Code Playgroud)

在这个课程中我还有一个DAO,我将其传递给我的Runnable(FooWorker),如下所示:

@Override
public void addTask(FooRecord record) {
    if (pool == null) {
        throw new FooException(ERROR_THREAD_POOL_CONFIGURATION_NOT_SET);
    }
    pool.execute(new FooWorker(context, calculator, dao, record));
}
Run Code Online (Sandbox Code Playgroud)

FooWorkerrecord通过状态机运行(唯一的非单例)calculator然后将转换发送到数据库dao,如下所示:

public void run() {
    calculator.calculate(record);
    dao.save(record);
}
Run Code Online (Sandbox Code Playgroud)

一旦我的主线程完成创建新任务,我尝试等待确保所有线程成功完成:

while (pool.getActiveCount() > 0) {
    recordHandler.awaitTermination(terminationTimeout, 
                                   terminationTimeoutUnit);
}
Run Code Online (Sandbox Code Playgroud)

我从输出日志中看到的(由于线程可能不可靠)是getActiveCount()太早返回零,而while()循环正在退出,而我的最后一个线程仍在打印输出calculator.

注意我也试过调用pool.shutdown()然后使用,awaitTermination但是下次我的作业运行时池仍然关闭.

我唯一的猜测是,在一个线程中,当我将数据发送到dao(因为它是由主线程中的Spring创建的单例...)时,java正在考虑该线程处于非活动状态,因为(我假设)它处理/等待主线程.

直觉上,仅基于我所看到的,这是我的猜测.但是......真的发生了什么事吗?有没有办法"正确行事"而不在顶部放置手动递增变量,run()在结尾处递减以跟踪线程数?

如果答案是"不要传入dao",那么我不会为每个线程"新"DAO吗?我的过程已经是一个(美丽,高效)的野兽,但那真的很糟糕.

Joa*_*uer 8

作为状态的JavaDocgetActiveCount,它是一个近似值:您不应该基于此做出任何主要的业务逻辑决策.

如果您想等待所有计划任务完成,那么您应该只使用

pool.shutdown();
pool.awaitTermination(terminationTimeout, terminationTimeoutUnit);
Run Code Online (Sandbox Code Playgroud)

如果您需要等待特定任务完成,您应该使用submit()而不是execute()然后检查Future对象是否完成(isDone()如果您想要非阻塞地执行它,或者只是get()在任务完成之前调用哪些块).


nic*_*ild 6

文档建议该方法getActiveCount()ThreadPoolExecutor是不是一个确切的数目:

getActiveCount

public int getActiveCount()

返回正在执行任务的大致线程数.

返回:线程数

就个人而言,当我做这样的多线程工作时,我使用一个变量,我在添加任务时递增,并在我抓取输出时递减.