创建动态(增长/缩小)线程池

Ola*_*hat 15 java multithreading threadpool

我需要在Java中实现一个线程池(java.util.concurrent),其空闲时线程数达到某个最小值,当作业提交到它上面的速度超过完成执行时,它会增长到上限(但绝不会更远) ,当所有作业完成并且不再提交任何作业时,缩小回到下限.

你会如何实现这样的东西?我想这将是一个相当常见的使用场景,但显然java.util.concurrent.Executors工厂方法只能创建固定大小的池和池,当提交许多作业时,这些池和池无限增长.本ThreadPoolExecutor类提供corePoolSizemaximumPoolSize参数,但它的文档似乎暗示,只有这样,才能不断有超过corePoolSize线程在同一时间使用有界作业队列,在这种情况下,如果你已经达到maximumPoolSize线程,你会得到工作拒绝你必须自己处理?我想出了这个:

//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(minSize));
...

//submitting jobs
for (Runnable job : ...) {
    while (true) {
        try {
            pool.submit(job);
            System.out.println("Job " + job + ": submitted");
            break;
        } catch (RejectedExecutionException e) {
            // maxSize jobs executing concurrently atm.; re-submit new job after short wait
            System.out.println("Job " + job + ": rejected...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e1) {
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我忽略了什么吗?有一个更好的方法吗?此外,根据一个人的要求,上述代码至少(我认为)(total number of jobs) - maxSize工作完成后才会完成,这可能会有问题.因此,如果您希望能够将任意数量的作业提交到池中并立即继续而无需等待其中任何一个完成,我看不出如何在没有专门的"作业总结"线程的情况下执行此操作保存所有已提交作业所需的无限队列.AFAICS,如果你为ThreadPoolExecutor本身使用一个无界的队列,它的线程数将永远不会超过corePoolSize.

Kum*_*tra 9

当增长和缩小与线程一起出现时,只有一个名字出现在我的脑海中:来自java.util.concurrent包的CachedThreadPool.

ExecutorService executor = Executors.newCachedThreadPool();
Run Code Online (Sandbox Code Playgroud)

CachedThreadPool()可以重用该线程,并在需要时创建新线程.是的,如果一个线程闲置60秒,CachedThreadPool将终止它.所以这非常轻巧 - 用你的话说成长和缩小!

  • 是的,但没有限制. (9认同)
  • 您可以手动甚至在运行时使用底层 ThreadPoolExecutor 并设置 maximumPoolSize (2认同)

Gra*_*ray 5

可能会帮助您的一个技巧是分配一个RejectedExecutionHandler使用相同线程的作业,以将作业提交到阻塞队列。这将阻塞当前线程并消除某种循环的需要。

在这里查看我的答案:

如果需要处理的数据太多,如何使ThreadPoolExecutor命令等待?

这是从该答案中复制的拒绝处理程序。

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});
Run Code Online (Sandbox Code Playgroud)

然后,只要您意识到在核心线程上方创建任何线程之前首先使用的有界阻塞队列已满,就应该能够利用核心/最大线程计数。因此,如果您有10个核心线程,并且希望第11个作业启动第11个线程,那么您将需要具有大小为0的阻塞队列(可能是a SynchronousQueue)。我觉得这是本来不错的ExecutorService课程的真正限制。