队列已满时ThreadPoolExecutor阻塞?

ghe*_*ton 53 java concurrency multithreading executorservice executor

我试图使用ThreadPoolExecutor执行许多任务.以下是一个假设的例子:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)
Run Code Online (Sandbox Code Playgroud)

问题是我很快得到了java.util.concurrent.RejectedExecutionException,因为任务数量超过了工作队列的大小.但是,我正在寻找的所需行为是让主线程阻塞,直到队列中有空间.完成此任务的最佳方法是什么?

Dar*_*roy 63

在一些非常狭窄的情况下,您可以实现执行所需操作的java.util.concurrent.RejectedExecutionHandler.

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);
Run Code Online (Sandbox Code Playgroud)

现在.由于以下原因,这是一个非常糟糕的主意

  • 它很容易出现死锁,因为池中的所有线程都可能在您放入队列中的事物可见之前死亡.通过设置合理的保持活动时间来缓解此问题.
  • 任务没有像Executor所期望的那样包装.许多执行程序实现在执行之前将其任务包装在某种跟踪对象中.看看你的来源.
  • API强烈建议不要通过getQueue()添加,并且可能在某些时候被禁止.

一个几乎总是更好的策略是安装ThreadPoolExecutor.CallerRunsPolicy,它将通过在调用execute()的线程上运行任务来限制你的应用程序.

但是,有时候一个具有所有固有风险的阻止策略实际上就是你想要的.我会说在这些条件下

  • 你只有一个线程调用execute()
  • 您必须(或想要)拥有非常小的队列长度
  • 你绝对需要限制运行这项工作的线程数(通常是出于外部原因),并且调用者运行策略会破坏它.
  • 您的任务大小不可预测,因此如果池暂时忙于4个短任务而且您的一个线程调用执行程序遇到大问题,则调用者运行可能会导致饥饿.

所以,正如我所说.它很少需要并且可能很危险,但是你去了.

祝好运.

  • 一个非常深思熟虑的回应.我的条件是"你必须(或想要)拥有非常小的队列长度." 您可能无法预测给定作业将排队的任务数.也许你正在运行一个处理来自某个数据库的数据的日常工作,周一有500个记录要处理,但周二有50,000个.你必须在你的队列上设置一个上限,这样你就不会在一份大工作结束时砸你的堆.在那种情况下,在排队更多之前等待某些任务完成是没有害处的. (3认同)
  • "它很容易出现死锁,因为池中的所有线程都可能在您放入队列中的事物可见之前死亡.通过设置合理的保持活动时间来缓解此问题." 通过将最小池大小设置为大于零的值,是否可以完全避免死锁?所有其他原因都是Java的后果,没有内置支持阻止放入执行程序队列.这很有趣,因为它似乎是一个非常合理的策略.我想知道原理是什么. (2认同)
  • 阻止策略的另一个条件可能是执行顺序很重要。CallerRunsPolicy表示被拒绝的任务很可能会在执行程序中的其他未决项目之前执行。 (2认同)

小智 5

您可以使用a semaphore来阻止线程进入池中.

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);

Semaphore lock = new Semaphore(6); // equal to queue capacity

for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
Run Code Online (Sandbox Code Playgroud)

一些陷阱:

  • 仅将此模式与固定线程池一起使用.队列不太可能经常填满,因此不会创建新线程.查看ThreadPoolExecutor上的java文档以获取更多详细信息:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html有一种解决方法,但它已经过时了这个答案的范围.
  • 队列大小应该高于核心线程的数量.如果我们要使队列大小为3,最终会发生什么:

    • T0:所有三个线程都在工作,队列为空,没有许可证可用.
    • T1:线程1完成,释放许可证.
    • T2:线程1轮询队列以寻找新工作,找不到,并等待.
    • T3:主线程将工作提交到池中,线程1开始工作.

    上面的例子转换为线程主线程阻塞线程1.它可能看起来像一个小周期,但现在将频率乘以天和月.突然之间,短时间内浪费了大量的时间.


小智 5

您需要做的是将ThreadPoolExecutor包装到Executor中,它明确限制其中并发执行的操作量:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}
Run Code Online (Sandbox Code Playgroud)

您可以将concurrentTasksLimit调整为委托执行程序的threadPoolSize + queueSize,它几乎可以解决您的问题


dis*_*upt 5

这就是我最终做的:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}
Run Code Online (Sandbox Code Playgroud)