Jan*_*sch 8 java concurrency race-condition executor
BoundedExecutorJava Concurrency in Practice一书中的实现有些奇怪.
当有足够的线程在Executor中排队或运行时,它应该通过阻止提交线程来限制任务提交给Executor.
这是实现(在catch子句中添加缺少的rethrow之后):
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
@Override public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
Run Code Online (Sandbox Code Playgroud)
当我BoundedExecutor用a Executors.newCachedThreadPool()和4的实例化实例化时,我希望缓存的线程池实例化的线程数永远不会超过4.但实际上,它确实如此.我已经得到了这个小测试程序来创建多达11个线程:
public static void main(String[] args) throws Exception {
class CountingThreadFactory implements ThreadFactory {
int count;
@Override public Thread newThread(Runnable r) {
++count;
return new Thread(r);
}
}
List<Integer> counts = new ArrayList<Integer>();
for (int n = 0; n < 100; ++n) {
CountingThreadFactory countingThreadFactory = new CountingThreadFactory();
ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory);
try {
BoundedExecutor be = new BoundedExecutor(exec, 4);
for (int i = 0; i < 20000; ++i) {
be.submitTask(new Runnable() {
@Override public void run() {}
});
}
} finally {
exec.shutdown();
}
counts.add(countingThreadFactory.count);
}
System.out.println(Collections.max(counts));
}
Run Code Online (Sandbox Code Playgroud)
我认为在信号量发布和任务结束之间有一点点时间框架,其中另一个线程可以获取许可并在发布线程尚未完成时提交任务.换句话说,它具有竞争条件.
有人能证实吗?
小智 10
BoundedExecutor确实是为了说明如何限制任务提交,而不是作为一种限制线程池大小的方法.至少有一条评论指出,有更多直接的方法来实现后者.
但是其他答案没有提到书中使用无界队列的文本
将信号量上的绑定设置为等于池大小加上要允许的排队任务数,因为信号量绑定了当前正在执行和等待执行的任务数.[JCiP,8.3.3节末]
通过提及无界队列和池大小,我们暗示(显然不是很清楚)使用有界大小的线程池.
然而,一直困扰我的BoundedExecutor是它没有实现ExecutorService接口.实现类似功能并仍然实现标准接口的现代方法是使用Guava的listenDecorator方法和ForwardingListeningExecutorService类.
你对竞争条件的分析是正确的.ExecutorService和Semaphore之间没有同步保证.
但是,我不知道是否限制线程数是BoundedExecutor用于什么.我认为更多的是限制提交给服务的任务数量.想象一下,如果你有500万个需要提交的任务,如果你提交了超过10,000个任务就会耗尽你的内存.
那么你在任何特定的时间都只会运行4个线程,你为什么要尝试排队所有500万个任务呢?您可以使用与此类似的构造来限制在任何给定时间排队的任务数.你应该得到的是,在任何给定的时间,只有4个任务在运行.
显然,解决这个问题的方法是使用a Executors.newFixedThreadPool(4).
我看到一次创建了多达 9 个线程。我怀疑存在竞争条件,导致线程数量超过所需数量。
这可能是因为在运行任务之前和之后都有需要完成的工作。这意味着即使代码块内只有 4 个线程,但仍有许多线程停止前一个任务或准备开始新任务。
即线程仍在运行时执行release()。尽管这是你做的最后一件事,但它并不是在获得新任务之前做的最后一件事。
| 归档时间: |
|
| 查看次数: |
1511 次 |
| 最近记录: |