Fix*_*int 99 java concurrency executor
我想创建一个ThreadPoolExecutor当它达到其最大大小并且队列已满时,该submit()方法在尝试添加新任务时阻塞.我是否需要为此实现自定义RejectedExecutionHandler,或者是否存在使用标准Java库执行此操作的方法?
Fix*_*int 45
我刚刚发现的一种可能的解决方案:
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}
Run Code Online (Sandbox Code Playgroud)
还有其他解决方案吗?我更喜欢基于RejectedExecutionHandler它的东西,因为它似乎是处理这种情况的标准方法.
小智 29
您可以使用ThreadPoolExecutor和blockingQueue:
public class ImageManager {
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
private ExecutorService executorService = new ThreadPoolExecutor(numOfThread, numOfThread,
0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);
private int downloadThumbnail(String fileListPath){
executorService.submit(new yourRunnable());
}
}
Run Code Online (Sandbox Code Playgroud)
dan*_*ben 12
您应该使用CallerRunsPolicy,在调用线程中执行被拒绝的任务.这样,在该任务完成之前,它不能向执行程序提交任何新任务,此时将有一些空闲池线程或该进程将重复.
来自文档:
被拒绝的任务
当Executor关闭时,以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时,将拒绝方法execute(java.lang.Runnable)中提交的新任务.在任何一种情况下,execute方法都会调用其RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,java.util.concurrent.ThreadPoolExecutor)方法.提供了四种预定义的处理程序策
- 在默认的ThreadPoolExecutor.AbortPolicy中,处理程序在拒绝时抛出运行时RejectedExecutionException.
- 在ThreadPoolExecutor.CallerRunsPolicy中,调用execute本身的线程运行该任务.这提供了一种简单的反馈控制机制,可以降低新任务的提交速度.
- 在ThreadPoolExecutor.DiscardPolicy中,简单地删除了无法执行的任务.
- 在ThreadPoolExecutor.DiscardOldestPolicy中,如果执行程序未关闭,则会删除工作队列头部的任务,然后重试执行(可能会再次失败,导致重复执行).
此外,在调用ThreadPoolExecutor构造函数时,请确保使用有界队列,例如ArrayBlockingQueue .否则,什么都不会被拒绝.
编辑:响应您的注释,将ArrayBlockingQueue的大小设置为等于线程池的最大大小,并使用AbortPolicy.
编辑2:好的,我知道你得到了什么.怎么样:覆盖beforeExecute()检查getActiveCount()不超过的方法getMaximumPoolSize(),如果是,请睡觉并再试一次?
Hibernate有一个BlockPolicy简单的,可以做你想要的:
请参阅:Executors.java
/**
* A handler for rejected tasks that will have the caller block until
* space is available.
*/
public static class BlockPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>BlockPolicy</tt>.
*/
public BlockPolicy() { }
/**
* Puts the Runnable to the blocking queue, effectively blocking
* the delegating thread until space is available.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put( r );
}
catch (InterruptedException e1) {
log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果对Executor使用无界队列,或者信号量绑定不大于队列大小BoundedExecutor,则Java Concurrency in Practice中引用的答案只能正常工作.信号量是提交线程和池中线程之间共享的状态,即使队列大小<bound <=(队列大小+池大小),也可以使执行器饱和.
使用CallerRunsPolicy仅在您的任务不能永久运行时才有效,在这种情况下,您的提交线程将rejectedExecution永久保留,如果您的任务需要很长时间才能运行,这是个坏主意,因为提交线程无法提交任何新任务或如果它本身正在运行任务,则执行任何其他操作.
如果这是不可接受的,那么我建议在提交任务之前检查执行程序的有界队列的大小.如果队列已满,请等待一小段时间再尝试再次提交.吞吐量会受到影响,但我认为它比许多其他提议的解决方案更简单,你保证不会有任何任务被拒绝.
我知道这是一个hack,但是我认为这里提供的大多数hack都是干净的;-)
因为ThreadPoolExecutor使用阻塞队列“ offer”而不是“ put”,所以可以覆盖阻塞队列的“ offer”行为:
class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {
BlockingQueueHack(int size) {
super(size);
}
public boolean offer(T task) {
try {
this.put(task);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}
}
ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));
Run Code Online (Sandbox Code Playgroud)
我测试了它,它似乎起作用了。实施超时策略留给读者练习。
以下类环绕一个 ThreadPoolExecutor 并使用一个信号量来阻塞,然后工作队列已满:
public final class BlockingExecutor {
private final Executor executor;
private final Semaphore semaphore;
public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
this.semaphore = new Semaphore(queueSize + maxPoolSize);
}
private void execImpl (final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executor.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
// will never be thrown with an unbounded buffer (LinkedBlockingQueue)
semaphore.release();
throw e;
}
}
public void execute (Runnable command) throws InterruptedException {
execImpl(command);
}
}
Run Code Online (Sandbox Code Playgroud)
这个包装类基于 Brian Goetz 所著的 Java Concurrency in Practice 一书中给出的解决方案。书中的解决方案只需要两个构造函数参数:一个Executor和用于信号量的边界。这显示在 Fixpoint 给出的答案中。这种方法有一个问题:它可能会进入池线程繁忙、队列已满、但信号量刚刚释放许可的状态。(semaphore.release()在 finally 块中)。在这种状态下,一个新的任务可以抢到刚刚释放的许可,但由于任务队列已满而被拒绝。当然,这不是你想要的;在这种情况下你想阻止。
为了解决这个问题,我们必须使用无界队列,正如 JCiP 明确提到的。信号量充当守卫,提供虚拟队列大小的效果。这具有副作用,即单元可能包含maxPoolSize + virtualQueueSize + maxPoolSize任务。这是为什么?因为
semaphore.release()在 finally 块中。如果所有池线程同时调用此语句,则maxPoolSize释放许可,允许相同数量的任务进入单元。如果我们使用有界队列,它仍然会满,导致任务被拒绝。现在,因为我们知道这只会在池线程几乎完成时发生,所以这不是问题。我们知道池线程不会阻塞,所以很快就会从队列中取出一个任务。
不过,您可以使用有界队列。只要确保它的大小等于virtualQueueSize + maxPoolSize. 更大的尺寸是无用的,信号量会阻止让更多的项目进入。更小的尺寸将导致被拒绝的任务。任务被拒绝的机会随着规模的减小而增加。例如,假设您想要一个 maxPoolSize=2 和 virtualQueueSize=5 的有界执行器。然后取一个具有 5+2=7 许可和实际队列大小为 5+2=7 的信号量。单元中可以包含的实际任务数为 2+5+2=9。当 executor 已满(队列中有 5 个任务,线程池中有 2 个任务,因此有 0 个可用的许可)并且所有池线程都释放了它们的许可时,那么进入的任务可以占用 2 个许可。
现在来自 JCiP 的解决方案使用起来有些麻烦,因为它没有强制执行所有这些约束(无界队列,或受这些数学限制等)。我认为这只是一个很好的例子来演示如何基于已经可用的部分构建新的线程安全类,而不是作为一个成熟的、可重用的类。我不认为后者是作者的意图。