ghe*_*ton 53 java concurrency multithreading executorservice executor
我试图使用ThreadPoolExecutor执行许多任务.以下是一个假设的例子:
def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
threadPoolExecutor.execute(runnable)
Run Code Online (Sandbox Code Playgroud)
问题是我很快得到了java.util.concurrent.RejectedExecutionException,因为任务数量超过了工作队列的大小.但是,我正在寻找的所需行为是让主线程阻塞,直到队列中有空间.完成此任务的最佳方法是什么?
Dar*_*roy 63
在一些非常狭窄的情况下,您可以实现执行所需操作的java.util.concurrent.RejectedExecutionHandler.
RejectedExecutionHandler block = new RejectedExecutionHandler() {
rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
executor.getQueue().put( r );
}
};
ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);
Run Code Online (Sandbox Code Playgroud)
现在.由于以下原因,这是一个非常糟糕的主意
一个几乎总是更好的策略是安装ThreadPoolExecutor.CallerRunsPolicy,它将通过在调用execute()的线程上运行任务来限制你的应用程序.
但是,有时候一个具有所有固有风险的阻止策略实际上就是你想要的.我会说在这些条件下
所以,正如我所说.它很少需要并且可能很危险,但是你去了.
祝好运.
小智 5
您可以使用a semaphore
来阻止线程进入池中.
ExecutorService service = new ThreadPoolExecutor(
3,
3,
1,
TimeUnit.HOURS,
new ArrayBlockingQueue<>(6, false)
);
Semaphore lock = new Semaphore(6); // equal to queue capacity
for (int i = 0; i < 100000; i++ ) {
try {
lock.acquire();
service.submit(() -> {
try {
task.run();
} finally {
lock.release();
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Run Code Online (Sandbox Code Playgroud)
一些陷阱:
队列大小应该高于核心线程的数量.如果我们要使队列大小为3,最终会发生什么:
上面的例子转换为线程主线程阻塞线程1.它可能看起来像一个小周期,但现在将频率乘以天和月.突然之间,短时间内浪费了大量的时间.
小智 5
您需要做的是将ThreadPoolExecutor包装到Executor中,它明确限制其中并发执行的操作量:
private static class BlockingExecutor implements Executor {
final Semaphore semaphore;
final Executor delegate;
private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
semaphore = new Semaphore(concurrentTasksLimit);
this.delegate = delegate;
}
@Override
public void execute(final Runnable command) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
final Runnable wrapped = () -> {
try {
command.run();
} finally {
semaphore.release();
}
};
delegate.execute(wrapped);
}
}
Run Code Online (Sandbox Code Playgroud)
您可以将concurrentTasksLimit调整为委托执行程序的threadPoolSize + queueSize,它几乎可以解决您的问题
这就是我最终做的:
int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 100000; i++) {
try {
lock.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
pool.execute(() -> {
try {
// Task logic
} finally {
lock.release();
}
});
}
Run Code Online (Sandbox Code Playgroud)