实践中的Java并发:BoundedExecutor中的竞争条件?

Jan*_*sch 8 java concurrency race-condition executor

BoundedExecutorJava Concurrency in Practice一书中的实现有些奇怪.

当有足够的线程在Executor中排队或运行时,它应该通过阻止提交线程来限制任务提交给Executor.

这是实现(在catch子句中添加缺少的rethrow之后):

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();

        try {
            exec.execute(new Runnable() {
                @Override public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
Run Code Online (Sandbox Code Playgroud)

当我BoundedExecutor用a Executors.newCachedThreadPool()和4的实例化实例化时,我希望缓存的线程池实例化的线程数永远不会超过4.但实际上,它确实如此.我已经得到了这个小测试程序来创建多达11个线程:

public static void main(String[] args) throws Exception {
    class CountingThreadFactory implements ThreadFactory {
        int count;

        @Override public Thread newThread(Runnable r) {
            ++count;
            return new Thread(r);
        }           
    }

    List<Integer> counts = new ArrayList<Integer>();

    for (int n = 0; n < 100; ++n) {
        CountingThreadFactory countingThreadFactory = new CountingThreadFactory();
        ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory);

        try {
            BoundedExecutor be = new BoundedExecutor(exec, 4);

            for (int i = 0; i < 20000; ++i) {
                be.submitTask(new Runnable() {
                    @Override public void run() {}
                });
            }
        } finally {
            exec.shutdown();
        }

        counts.add(countingThreadFactory.count);
    }

    System.out.println(Collections.max(counts));
}
Run Code Online (Sandbox Code Playgroud)

我认为在信号量发布和任务结束之间有一点点时间框架,其中另一个线程可以获取许可并在发布线程尚未完成时提交任务.换句话说,它具有竞争条件.

有人能证实吗?

小智 10

BoundedExecutor确实是为了说明如何限制任务提交,而不是作为一种限制线程池大小的方法.至少有一条评论指出,有更多直接的方法来实现后者.

但是其他答案没有提到书中使用无界队列的文本

将信号量上的绑定设置为等于池大小加上要允许的排队任务数,因为信号量绑定了当前正在执行和等待执行的任务数.[JCiP,8.3.3节末]

通过提及无界队列和池大小,我们暗示(显然不是很清楚)使用有界大小的线程池.

然而,一直困扰我的BoundedExecutor是它没有实现ExecutorService接口.实现类似功能并仍然实现标准接口的现代方法是使用Guava的listenDecorator方法和ForwardingListeningExecutorService类.


Joh*_*int 5

你对竞争条件的分析是正确的.ExecutorService和Semaphore之间没有同步保证.

但是,我不知道是否限制线程数是BoundedExecutor用于什么.我认为更多的是限制提交给服务的任务数量.想象一下,如果你有500万个需要提交的任务,如果你提交了超过10,000个任务就会耗尽你的内存.

那么你在任何特定的时间都只会运行4个线程,你为什么要尝试排队所有500万个任务呢?您可以使用与此类似的构造来限制在任何给定时间排队的任务数.你应该得到的是,在任何给定的时间,只有4个任务在运行.

显然,解决这个问题的方法是使用a Executors.newFixedThreadPool(4).


Pet*_*rey 2

我看到一次创建了多达 9 个线程。我怀疑存在竞争条件,导致线程数量超过所需数量。

这可能是因为在运行任务之前和之后都有需要完成的工作。这意味着即使代码块内只有 4 个线程,但仍有许多线程停止前一个任务或准备开始新任务。

即线程仍在运行时执行release()。尽管这是你做的最后一件事,但它并不是在获得新任务之前做的最后一件事。