Tah*_*tar 73 java concurrency executorservice threadpool
我正在尝试编写一个解决方案,其中单个线程生成可以并行执行的I/O密集型任务.每个任务都有重要的内存数据.所以我希望能够限制暂时待处理的任务数量.
如果我像这样创建ThreadPoolExecutor:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxQueue));
Run Code Online (Sandbox Code Playgroud)
然后在队列填满并且所有线程都已忙时executor.submit(callable)抛出RejectedExecutionException.
executor.submit(callable)当队列已满且所有线程都忙时,我该怎么做才能阻塞?
编辑:我试过这个:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
Run Code Online (Sandbox Code Playgroud)
它有点实现了我想要实现的效果但是以一种不雅的方式(基本上被拒绝的线程在调用线程中运行,因此这阻止了调用线程提交更多).
编辑:(提问后5年)
对于阅读此问题及其答案的任何人,请不要将接受的答案作为一个正确的解决方案.请仔细阅读所有答案和评论.
jta*_*orn 60
我做了同样的事情.诀窍是创建一个BlockingQueue,其中offer()方法实际上是put().(你可以使用你想要的任何基础BlockingQueue impl).
public class LimitedQueue<E> extends LinkedBlockingQueue<E>
{
public LimitedQueue(int maxSize)
{
super(maxSize);
}
@Override
public boolean offer(E e)
{
// turn offer() and add() into a blocking calls (unless interrupted)
try {
put(e);
return true;
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,这仅适用于线程池,corePoolSize==maxPoolSize因此请注意(请参阅注释).
cva*_*cca 15
以下是我在最后解决这个问题的方法:
(注意:此解决方案会阻止提交Callable的线程,因此它会阻止抛出RejectedExecutionException)
public class BoundedExecutor extends ThreadPoolExecutor{
private final Semaphore semaphore;
public BoundedExecutor(int bound) {
super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
semaphore = new Semaphore(bound);
}
/**Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{
semaphore.acquire();
return submit(task);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
semaphore.release();
}
}
Run Code Online (Sandbox Code Playgroud)
Kre*_*ase 10
当前接受的答案有一个潜在的重大问题 - 它改变了ThreadPoolExecutor.execute的行为,这样如果你有一个corePoolSize < maxPoolSize,ThreadPoolExecutor逻辑永远不会在核心之外添加额外的工作者.
来自ThreadPoolExecutor .execute(Runnable):
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
Run Code Online (Sandbox Code Playgroud)
具体来说,最后一个'else'块永远不会被击中.
更好的选择是做类似于OP已经在做的事情 - 使用RejectedExecutionHandler来做同样的put逻辑:
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
if (!executor.isShutdown()) {
executor.getQueue().put(r);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
}
}
Run Code Online (Sandbox Code Playgroud)
正如评论中指出的那样(参考这个答案),这种方法有一些值得注意的事项:
corePoolSize==0,则存在竞争条件,其中池中的所有线程可能在任务可见之前死亡ThreadPoolExecutor)的实现将导致问题,除非处理程序也以相同的方式包装它.记住这些问题,这个解决方案适用于大多数典型的ThreadPoolExecutors,并将正确处理这种情况corePoolSize < maxPoolSize.
我知道这是一个老问题,但有一个类似的问题,即创建新任务非常快,如果由于现有任务完成得不够快而导致 OutOfMemoryError 发生过多。
在我的情况下Callables,我需要结果,因此我需要存储所有Futures返回的executor.submit(). 我的解决方案是将其Futures放入BlockingQueue最大尺寸的 a 中。一旦该队列已满,在完成某些任务(从队列中删除元素)之前,不会生成更多任务。在伪代码中:
final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {
Thread taskGenerator = new Thread() {
@Override
public void run() {
while (reader.hasNext) {
Callable task = generateTask(reader.next());
Future future = executor.submit(task);
try {
// if queue is full blocks until a task
// is completed and hence no future tasks are submitted.
futures.put(future);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
executor.shutdown();
}
}
taskGenerator.start();
// read from queue as long as task are being generated
// or while Queue has elements in it
while (taskGenerator.isAlive()
|| !futures.isEmpty()) {
Future future = futures.take();
// do something
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException ex) {
throw new MyException(ex);
} finally {
executor.shutdownNow();
}
Run Code Online (Sandbox Code Playgroud)
小智 5
CallerBlocksPolicy如果您正在使用 spring 集成,那么使用该类怎么样?
此类实现该RejectedExecutionHandler接口,该接口是无法由ThreadPoolExecutor.
您可以像这样使用该策略。
executor.setRejectedExecutionHandler(new CallerBlocksPolicy());
Run Code Online (Sandbox Code Playgroud)
CallerBlocksPolicy和之间的主要区别CallerRunsPolicy在于它是阻塞还是运行调用者线程中的任务。
请参考这段代码。
| 归档时间: |
|
| 查看次数: |
35607 次 |
| 最近记录: |