如何实现阻塞线程池执行器?

Jok*_*ker 2 java multithreading threadpoolexecutor

我们有一个大文本文件,其中每一行都需要密集的process. 设计是让 aclass读取文件并将每一行的处理委托给 a thread, via thread pool。一旦池中没有空闲线程来进行处理,就应该阻止文件读取器类读取下一行。所以我需要一个blocking thread pool

在当前的实现ThreadPoolExecutor.submit()ThreadPoolExecutor.execute()方法中RejectedExecutionException,在配置的线程数变得忙碌之后抛出异常,如下面的代码片段所示。

public class BlockingTp {

    public static void main(String[] args) {
        BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
        ThreadPoolExecutor executorService=
            new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, blockingQueue);
        int Jobs = 10;
        System.out.println("Starting application with " + Jobs + " jobs");
        for (int i = 1; i <= Jobs; i++)
            try {
                executorService.submit(new WorkerThread(i));
                System.out.println("job added " + (i));
            } catch (RejectedExecutionException e) {
                System.err.println("RejectedExecutionException");
            }
    }
}

class WorkerThread implements Runnable {
    int job;
    public WorkerThread(int job) {
        this.job = job;
    }
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (Exception excep) {
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

上述程序的输出是

Starting application to add 10 jobs
Added job #1
Added job #2
Added job #3
Added job #4
Added job #5
Added job #6
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
Run Code Online (Sandbox Code Playgroud)

有人可以抛出一些光,即我如何实现阻塞线程池

Gra*_*ray 7

有人可以抛出一些光,即我如何实现阻塞线程池。

您需要在执行程序服务上设置拒绝执行处理程序。当线程将作业放入执行器时,它将阻塞,直到阻塞队列中有空间为止。

BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
ThreadPoolExecutor executorService =
     new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, arrayBlockingQueue);
// when the blocking queue is full, this tries to put into the queue which blocks
executorService.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // block until there's room
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Producer interrupted", e);
        }
    }
});
Run Code Online (Sandbox Code Playgroud)

因此,不是 TRE 抛出 a RejectedExecutionException,而是调用拒绝处理程序,后者将尝试将作业放回队列中。这会阻止调用者。

  • 像这样直接插入工作队列是不安全的,因为它绕过了 ThreadPoolExecutor 用来管理其工作池状态的逻辑。例如,它没有正确检查池是否已关闭,这意味着对 execute() 的调用可能会挂起而不是正确拒绝任务。 (2认同)