ExecutorService 动态扩展线程数

Jac*_*ack 4 java multithreading java.util.concurrent

我有一个工作单元列表,我想并行处理它们。每个单元工作时间为 8-15 秒,完全计算时间,无 I/O 阻塞。我想要实现的是ExecutorService

  • 当没有工作要做时实例化零个线程
  • 如果需要,可以动态扩展到 20 个线程
  • 允许我一次添加所有工作单位(不阻止提交)

就像是:

Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
    service.submit(() -> {
        .. do some work ..
        queue.offer(result);
    );
}
while(queue.peek() != null) {
    ... process results while they arrive ...
}
Run Code Online (Sandbox Code Playgroud)

我尝试但没有成功的是:

  • 使用 anewCachedThreadPool()创建了太多线程
  • 然后我使用了它的内部调用new ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>()),但后来我注意到由于同步队列,submit() 被阻塞
  • 所以我用过new LinkedBlockingQueue(),只是为了发现 ThreadPoolExecutor 只生成一个线程

我确信有官方的实现来处理这个非常基本的并发用例。有人可以建议吗?

ara*_*ran 7

ThreadPoolExecutor使用 aLinkedBlockingQueue20as创建corePoolSize构造函数中的第一个参数):

new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());


如果您在没有预定义容量的情况下使用,则:LinkedBlockingQueue Pool

  • 永远不会检查maxPoolSize
  • 不会corePoolSize创建比指定数量更多的线程。

在您的情况下,只会执行一个线程。你很幸运能得到一个,因为你将其设置为0,并且如果将其设置为(他们怎么敢?),以前版本的 Java(<1.6) 将不会创建任何内容corePoolSize0

进一步的版本确实会创建一个新线程,即使是corePoolSize0这看起来像是……一个修复……一个错误……改变了……逻辑行为

线程池执行器

当所有 corePoolSize 线程都忙时,使用无界队列(例如LinkedBlockingQueue没有预定义容量的队列)将导致新任务在队列中等待。因此,创建的线程不会超过 corePoolSize(因此,maximumPoolSize 的值没有任何影响。)


关于缩小规模

为了在没有工作要做的情况下删除所有线程,您必须coreThreads专门关闭线程(默认情况下它们不会终止)。要实现此目的,请 allowCoreThreadTimeOut(true)在启动之前进行设置Pool

请注意设置正确的keep-alive超时:例如,如果平均每 6 秒收到一个新任务,则将保持活动时间设置为 5 秒可能会导致不必要的擦除+创建操作(哦亲爱的线程,您只需等一下!)。根据任务接收收入速度设置该超时时间。

allowCoreThreadTimeOut

设置策略,控制如果在保持活动时间内没有任务到达,核心线程是否可能超时并终止,并在新任务到达时根据需要进行替换。当为 false 时,核心线程永远不会由于缺少传入任务而终止。如果为 true,则适用于非核心线程的相同保持活动策略也适用于核心线程。为了避免不断的线程替换,设置 true 时保持活动时间必须大于零。通常应在主动使用池之前调用此方法。


TL/DR

  • LinkedBloquingQueue作为任务队列无界。
  • corePoolSize替换maxPoolSize意思
  • allowCoreThreadTimeOut(true)为了允许使用基于超时的机制Pool来缩小规模,该机制也会影响.coreThreads
  • keep-alive根据任务接收延迟将值设置为逻辑值。

这种新的混合将导致ExecutorService99,99999% 的时间不会阻塞提交者 (要发生这种情况,排队的任务数应该是2.147.483.647,并且有效地扩展了基础中的线程数工作负载的变化,在并发线程之间波动(双向) { 0 <--> corePoolSize }

作为建议,应该监视队列的大小,因为非阻塞行为是有代价的:OOM如果它在不受控制的情况下持续增长,则获得异常的概率,直到INTEGER.MAX_VALUE满足(fe:如果线程死锁一整天,而提交者不断插入任务)即使任务在内存中的大小可能很小,2.147.483.647 对象及其相应的链接包装器等......也是很多额外的负载。