如果ThreadPoolExecutor的commit()方法已经饱和,如何阻止它?

Fix*_*int 99 java concurrency executor

我想创建一个ThreadPoolExecutor当它达到其最大大小并且队列已满时,该submit()方法在尝试添加新任务时阻塞.我是否需要为此实现自定义RejectedExecutionHandler,或者是否存在使用标准Java库执行此操作的方法?

Fix*_*int 45

我刚刚发现的一种可能的解决方案:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

还有其他解决方案吗?我更喜欢基于RejectedExecutionHandler它的东西,因为它似乎是处理这种情况的标准方法.

  • 这个答案并不完全正确,也是评论.这段代码确实来自Java Concurrency in Practice,如果你把它的上下文考虑在内,它是正确的.本书明确指出:"在这种方法中,使用**无界**队列(...)并将信号量上的边界设置为等于池大小加上您想要允许的排队任务数量".使用无界限队列,任务永远不会被拒绝,因此重新抛出异常完全没用!我相信,这也是为什么`throw e;`在本书中不是****的原因.JCIP是对的! (11认同)
  • 在finally子句中释放信号量并获取信号量之间是否存在竞争条件? (2认同)
  • 如上所述,此实现存在缺陷,因为信号量在任务完成之前被释放.最好使用方法java.util.concurrent.ThreadPoolExecutor #afterExecute(Runnable,Throwable) (2认同)
  • @FelixM:使用java.util.concurrent.ThreadPoolExecutor #afterExecute(Runnable,Throwable)将无法解决问题,因为在java.util.concurrent.ThreadPoolExecutor#runWorker(Worker w)中的task.run()之后立即调用afterExecute,之前从队列中获取下一个元素(查看openjdk 1.7.0.6的源代码). (2认同)
  • 这个答案来自 Brian Goetz 所著的 Java Concurrency in Practice (2认同)

小智 29

您可以使用ThreadPoolExecutor和blockingQueue:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 这会在提交线程上运行被拒绝的任务.哪个功能不符合OP的要求. (54认同)
  • Downvoted:当TPE饱和时,这不会阻止.这只是一种替代方案,而不是解决方案. (4认同)
  • 这确实在"调用线程"中运行任务而不是阻塞将它放在队列上,这会产生一些不利影响,比如多个线程以这种方式调用它,超过"队列大小"作业将运行,如果任务发生的时间比预期的要长,你的"生产"线程可能不会让执行者忙碌.但在这里工作得很好! (3认同)
  • 赞成:这非常适合“TPE 的设计”和“自然地阻止”客户端线程,因为它们可以让它们做溢出测试。这应该涵盖大多数用例,但当然不是所有用例,您应该了解幕后发生的事情。 (2认同)

dan*_*ben 12

您应该使用CallerRunsPolicy,在调用线程中执行被拒绝的任务.这样,在该任务完成之前,它不能向执行程序提交任何新任务,此时将有一些空闲池线程或该进程将重复.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

来自文档:

被拒绝的任务

当Executor关闭时,以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时,将拒绝方法execute(java.lang.Runnable)中提交的新任务.在任何一种情况下,execute方法都会调用其RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,java.util.concurrent.ThreadPoolExecutor)方法.提供了四种预定义的处理程序策

  1. 在默认的ThreadPoolExecutor.AbortPolicy中,处理程序在拒绝时抛出运行时RejectedExecutionException.
  2. 在ThreadPoolExecutor.CallerRunsPolicy中,调用execute本身的线程运行该任务.这提供了一种简单的反馈控制机制,可以降低新任务的提交速度.
  3. 在ThreadPoolExecutor.DiscardPolicy中,简单地删除了无法执行的任务.
  4. 在ThreadPoolExecutor.DiscardOldestPolicy中,如果执行程序未关闭,则会删除工作队列头部的任务,然后重试执行(可能会再次失败,导致重复执行).

此外,在调用ThreadPoolExecutor构造函数时,请确保使用有界队列,例如ArrayBlockingQueue .否则,什么都不会被拒绝.

编辑:响应您的注释,将ArrayBlockingQueue的大小设置为等于线程池的最大大小,并使用AbortPolicy.

编辑2:好的,我知道你得到了什么.怎么样:覆盖beforeExecute()检查getActiveCount()不超过的方法getMaximumPoolSize(),如果是,请睡觉并再试一次?

  • CallerRunPolicy的问题在于,如果你有一个线程生成器,如果长时间运行的任务被拒绝,你通常会没有使用线程(因为线程池中的其他任务将在长时间运行的任务仍然完成时完成运行). (7认同)
  • 我希望将多个并发执行的任务严格限制(通过Executor中的线程数),这就是为什么我不允许调用者线程自己执行这些任务. (3认同)
  • 我要求阻止,而不是睡觉和投票;) (2认同)

the*_*oid 12

请查看以下四种方法:创建NotifyingBlockingThreadPoolExecutor

  • 该页面不再存在. (4认同)

Nat*_*ray 6

Hibernate有一个BlockPolicy简单的,可以做你想要的:

请参阅:Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 再想一想,这是一个非常糟糕的主意.我不建议你使用它.有充分理由请看:http://stackoverflow.com/questions/3446011/threadpoolexecutor-block-when-queue-is-full/3518588#3518588 (4认同)
  • 哇,太丑了。基本上,此解决方案会干扰 TPE 的内部结构。`ThreadPoolExecutor` 的 javadoc 甚至从字面上说:“方法 getQueue() 允许访问工作队列以进行监视和调试。强烈建议不要将此方法用于任何其他目的。”。这是被如此众所周知库提供,绝对是悲伤地看到。 (2认同)

ste*_*n f 6

如果对Executor使用无界队列,或者信号量绑定不大于队列大小BoundedExecutor,则Java Concurrency in Practice中引用的答案只能正常工作.信号量是提交线程和池中线程之间共享的状态,即使队列大小<bound <=(队列大小+池大小),也可以使执行器饱和.

使用CallerRunsPolicy仅在您的任务不能永久运行时才有效,在这种情况下,您的提交线程将rejectedExecution永久保留,如果您的任务需要很长时间才能运行,这是个坏主意,因为提交线程无法提交任何新任务或如果它本身正在运行任务,则执行任何其他操作.

如果这是不可接受的,那么我建议在提交任务之前检查执行程序的有界队列的大小.如果队列已满,请等待一小段时间再尝试再次提交.吞吐量会受到影响,但我认为它比许多其他提议的解决方案更简单,你保证不会有任何任务被拒绝.

  • 我不确定在提交之前检查队列长度如何保证在具有多个任务生产者的多线程环境中没有被拒绝的任务。这听起来不是线程安全的。 (2认同)

Jak*_*kub 5

我知道这是一个hack,但是我认为这里提供的大多数hack都是干净的;-)

因为ThreadPoolExecutor使用阻塞队列“ offer”而不是“ put”,所以可以覆盖阻塞队列的“ offer”行为:

class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));
Run Code Online (Sandbox Code Playgroud)

我测试了它,它似乎起作用了。实施超时策略留给读者练习。


Tim*_*mos 5

以下类环绕一个 ThreadPoolExecutor 并使用一个信号量来阻塞,然后工作队列已满:

public final class BlockingExecutor { 

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}
Run Code Online (Sandbox Code Playgroud)

这个包装类基于 Brian Goetz 所著的 Java Concurrency in Practice 一书中给出的解决方案。书中的解决方案只需要两个构造函数参数:一个Executor和用于信号量的边界。这显示在 Fixpoint 给出的答案中。这种方法有一个问题:它可能会进入池线程繁忙、队列已满、但信号量刚刚释放许可的状态。(semaphore.release()在 finally 块中)。在这种状态下,一个新的任务可以抢到刚刚释放的许可,但由于任务队列已满而被拒绝。当然,这不是你想要的;在这种情况下你想阻止。

为了解决这个问题,我们必须使用无界队列,正如 JCiP 明确提到的。信号量充当守卫,提供虚拟队列大小的效果。这具有副作用,即单元可能包含maxPoolSize + virtualQueueSize + maxPoolSize任务。这是为什么?因为 semaphore.release()在 finally 块中。如果所有池线程同时调用此语句,则maxPoolSize释放许可,允许相同数量的任务进入单元。如果我们使用有界队列,它仍然会满,导致任务被拒绝。现在,因为我们知道这只会在池线程几乎完成时发生,所以这不是问题。我们知道池线程不会阻塞,所以很快就会从队列中取出一个任务。

不过,您可以使用有界队列。只要确保它的大小等于virtualQueueSize + maxPoolSize. 更大的尺寸是无用的,信号量会阻止让更多的项目进入。更小的尺寸将导致被拒绝的任务。任务被拒绝的机会随着规模的减小而增加。例如,假设您想要一个 maxPoolSize=2 和 virtualQueueSize=5 的有界执行器。然后取一个具有 5+2=7 许可和实际队列大小为 5+2=7 的信号量。单元中可以包含的实际任务数为 2+5+2=9。当 executor 已满(队列中有 5 个任务,线程池中有 2 个任务,因此有 0 个可用的许可)并且所有池线程都释放了它们的许可时,那么进入的任务可以占用 2 个许可。

现在来自 JCiP 的解决方案使用起来有些麻烦,因为它没有强制执行所有这些约束(无界队列,或受这些数学限制等)。我认为这只是一个很好的例子来演示如何基于已经可用的部分构建新的线程安全类,而不是作为一个成熟的、可重用的类。我不认为后者是作者的意图。