Jac*_*ack 4 java multithreading java.util.concurrent
我有一个工作单元列表,我想并行处理它们。每个单元工作时间为 8-15 秒,完全计算时间,无 I/O 阻塞。我想要实现的是ExecutorService:
就像是:
Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
service.submit(() -> {
.. do some work ..
queue.offer(result);
);
}
while(queue.peek() != null) {
... process results while they arrive ...
}
Run Code Online (Sandbox Code Playgroud)
我尝试但没有成功的是:
newCachedThreadPool()创建了太多线程new ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>()),但后来我注意到由于同步队列,submit() 被阻塞new LinkedBlockingQueue(),只是为了发现 ThreadPoolExecutor 只生成一个线程我确信有官方的实现来处理这个非常基本的并发用例。有人可以建议吗?
ThreadPoolExecutor使用 aLinkedBlockingQueue和 20as创建corePoolSize(构造函数中的第一个参数):
new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());
如果您在没有预定义容量的情况下使用,则:LinkedBlockingQueue Pool
maxPoolSize。corePoolSize创建比指定数量更多的线程。在您的情况下,只会执行一个线程。你很幸运能得到一个,因为你将其设置为0,并且如果将其设置为(他们怎么敢?),以前版本的 Java(<1.6) 将不会创建任何内容corePoolSize0 。
进一步的版本确实会创建一个新线程,即使是corePoolSize,0这看起来像是……一个修复……一个错误……改变了……逻辑行为?
当所有 corePoolSize 线程都忙时,使用无界队列(例如
LinkedBlockingQueue没有预定义容量的队列)将导致新任务在队列中等待。因此,创建的线程不会超过 corePoolSize。(因此,maximumPoolSize 的值没有任何影响。)
关于缩小规模
为了在没有工作要做的情况下删除所有线程,您必须coreThreads专门关闭线程(默认情况下它们不会终止)。要实现此目的,请 allowCoreThreadTimeOut(true)在启动之前进行设置Pool。
请注意设置正确的keep-alive超时:例如,如果平均每 6 秒收到一个新任务,则将保持活动时间设置为 5 秒可能会导致不必要的擦除+创建操作(哦亲爱的线程,您只需等一下!)。根据任务接收收入速度设置该超时时间。
设置策略,控制如果在保持活动时间内没有任务到达,核心线程是否可能超时并终止,并在新任务到达时根据需要进行替换。当为 false 时,核心线程永远不会由于缺少传入任务而终止。如果为 true,则适用于非核心线程的相同保持活动策略也适用于核心线程。为了避免不断的线程替换,设置 true 时保持活动时间必须大于零。通常应在主动使用池之前调用此方法。
TL/DR
LinkedBloquingQueue作为任务队列无界。corePoolSize替换maxPoolSize的意思。allowCoreThreadTimeOut(true)为了允许使用基于超时的机制Pool来缩小规模,该机制也会影响.coreThreadskeep-alive根据任务接收延迟将值设置为逻辑值。这种新的混合将导致ExecutorService99,99999% 的时间不会阻塞提交者 (要发生这种情况,排队的任务数应该是2.147.483.647),并且有效地扩展了基础中的线程数工作负载的变化,在并发线程之间波动(双向) { 0 <--> corePoolSize }。
作为建议,应该监视队列的大小,因为非阻塞行为是有代价的:OOM如果它在不受控制的情况下持续增长,则获得异常的概率,直到INTEGER.MAX_VALUE满足(fe:如果线程死锁一整天,而提交者不断插入任务)。即使任务在内存中的大小可能很小,2.147.483.647 对象及其相应的链接包装器等......也是很多额外的负载。