Java:在某个队列大小之后阻止提交的ExecutorService

Tah*_*tar 73 java concurrency executorservice threadpool

我正在尝试编写一个解决方案,其中单个线程生成可以并行执行的I/O密集型任务.每个任务都有重要的内存数据.所以我希望能够限制暂时待处理的任务数量.

如果我像这样创建ThreadPoolExecutor:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));
Run Code Online (Sandbox Code Playgroud)

然后在队列填满并且所有线程都已忙时executor.submit(callable)抛出RejectedExecutionException.

executor.submit(callable)当队列已满且所有线程都忙时,我该怎么做才能阻塞?

编辑:我试过这个:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
Run Code Online (Sandbox Code Playgroud)

它有点实现了我想要实现的效果但是以一种不雅的方式(基本上被拒绝的线程在调用线程中运行,因此这阻止了调用线程提交更多).

编辑:(提问后5年)

对于阅读此问题及其答案的任何人,请不要将接受的答案作为一个正确的解决方案.请仔细阅读所有答案和评论.

jta*_*orn 60

我做了同样的事情.诀窍是创建一个BlockingQueue,其中offer()方法实际上是put().(你可以使用你想要的任何基础BlockingQueue impl).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}
Run Code Online (Sandbox Code Playgroud)

请注意,这仅适用于线程池,corePoolSize==maxPoolSize因此请注意(请参阅注释).

  • 我不同意 - 这改变了ThreadPoolExecutor.execute的行为,这样如果你有一个corePoolSize <maxPoolSize,那么ThreadPoolExecutor逻辑将永远不会在核心之外添加额外的工作者. (6认同)
  • 我不认为这是一个好主意,因为它改变了offer方法的协议.Offer方法应该是非阻塞调用. (5认同)
  • 澄清一下 - 只要您将约束保存在`corePoolSize == maxPoolSize`中,您的解决方案就会起作用.没有它,它不再让ThreadPoolExecutor具有设计的行为.我一直在寻找解决这个没有限制的问题的解决方案; 我们最终采取的方法见下面的替代答案. (5认同)
  • 或者你可以扩展SynchronousQueue以防止缓冲,只允许直接切换. (2认同)

cva*_*cca 15

以下是我在最后解决这个问题的方法:

(注意:此解决方案会阻止提交Callable的线程,因此它会阻止抛出RejectedExecutionException)

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
Run Code Online (Sandbox Code Playgroud)

  • afterExecute 由运行任务的同一个线程调用,因此它还没有完成。自己做测试。实施该解决方案,并在执行者身上投入大量工作,如果工作被拒绝则抛出。你会注意到,是的,这有一个竞争条件,并且不难重现它。 (2认同)
  • 转到 ThreadPoolExecutor 并检查 runWorker(Worker w) 方法。您会看到 afterExecute 完成后发生的事情,包括解锁工作线程和增加已完成任务的数量。因此,您允许任务进入(通过释放信号量)而无需使用带宽来处理它们(通过调用 processWorkerExit)。 (2认同)

Kre*_*ase 10

当前接受的答案有一个潜在的重大问题 - 它改变了ThreadPoolExecutor.execute的行为,这样如果你有一个corePoolSize < maxPoolSize,ThreadPoolExecutor逻辑永远不会在核心之外添加额外的工作者.

来自ThreadPoolExecutor .execute(Runnable):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
Run Code Online (Sandbox Code Playgroud)

具体来说,最后一个'else'块永远不会被击中.

更好的选择是做类似于OP已经在做的事情 - 使用RejectedExecutionHandler来做同样的put逻辑:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}
Run Code Online (Sandbox Code Playgroud)

正如评论中指出的那样(参考这个答案),这种方法有一些值得注意的事项:

  1. 如果corePoolSize==0,则存在竞争条件,其中池中的所有线程可能在任务可见之前死亡
  2. 使用包装队列任务(不适用于ThreadPoolExecutor)的实现将导致问题,除非处理程序也以相同的方式包装它.

记住这些问题,这个解决方案适用于大多数典型的ThreadPoolExecutors,并将正确处理这种情况corePoolSize < maxPoolSize.

  • 我没有downvote,但它似乎是[一个非常糟糕的主意](http://stackoverflow.com/a/3518588/3080094) (2认同)

beg*_*er_ 5

我知道这是一个老问题,但有一个类似的问题,即创建新任务非常快,如果由于现有任务完成得不够快而导致 OutOfMemoryError 发生过多。

在我的情况下Callables,我需要结果,因此我需要存储所有Futures返回的executor.submit(). 我的解决方案是将其Futures放入BlockingQueue最大尺寸的 a 中。一旦该队列已满,在完成某些任务(从队列中删除元素)之前,不会生成更多任务。在伪代码中:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {   
    Thread taskGenerator = new Thread() {
        @Override
        public void run() {
            while (reader.hasNext) {
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try {
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(future);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();         
                }
            }
        executor.shutdown();
        }
    }
    taskGenerator.start();
    
    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) {
        Future future = futures.take();
        // do something
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();     
} catch (ExecutionException ex) {
    throw new MyException(ex);
} finally {
    executor.shutdownNow();
}
Run Code Online (Sandbox Code Playgroud)


小智 5

CallerBlocksPolicy如果您正在使用 spring 集成,那么使用该类怎么样?

此类实现该RejectedExecutionHandler接口,该接口是无法由ThreadPoolExecutor.

您可以像这样使用该策略。

executor.setRejectedExecutionHandler(new CallerBlocksPolicy());
Run Code Online (Sandbox Code Playgroud)

CallerBlocksPolicy和之间的主要区别CallerRunsPolicy在于它是阻塞还是运行调用者线程中的任务。

请参考这段代码