如何捕获CompletableFuture的whenCompleteAsync调用中抛出的RejectedExecutionException?

Sud*_*era 4 java future executorservice java-8 completable-future

下面的示例代码我正在注入一个biconsumer睡眠时间为100毫秒,作为一组可完成的未来的完成动作.我whenCompleteAsync通过单独executorService使用来使用方法.executorService是一个ThreadPoolExecutor与芯池大小5,最大尺寸5和1队列的长度.

public class CompleteTest {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));

        ArrayList<CompletableFuture<String>> list = new ArrayList<>();

        for (int i = 0; i <100; i++) {
            CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
            stringCompletableFuture.whenCompleteAsync((e, a) -> {
                System.out.println("Complete " + e);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e1) {e1.printStackTrace();}
            }, executorService);

            list.add(stringCompletableFuture);
        }

        for (int i = 0; i < list.size(); i++) {
            list.get(i).complete(i + "");
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

当我运行代码时,即使我正在完成100个期货,也只会打印6个输出.这是5个核心线程和1个排队的线程.剩下的会怎么样?如果其他runnable由于队列已满而无法提交给执行程序服务,则不应该有异常.

产量

Complete 0
Complete 1
Complete 2
Complete 3
Complete 4
Complete 5
Run Code Online (Sandbox Code Playgroud)

Sot*_*lis 5

抛出异常,并且CompletableFuture异常完成a,而不是您正在跟踪的任何异常.

您正在ThreadPoolExecutor使用构造函数实例化和初始化a ,该构造函数使用RejectedExecutionHandler仅抛出异常的默认值.我们知道,RejectedExecutionException如果一个人ExecutorService无法接受任务,就会被抛出.那么任务添加到哪里以及抛出的异常在哪里?

就目前而言,所有的链接都在其中发生whenCompleteAsync.当你调用它时,你将一个依赖项添加到接收器CompletableFuture,stringCompletableFuture.何时stringCompletableFuture完成(在这种情况下成功),它将创建一个新的CompletableFuture(它返回)并尝试在给定的BiConsumer时间上安排给定的ExecutorService.

由于ExecutorService队列没有空间,它将调用RejectedExecutionHandler将抛出的队列RejectedExecutionException.该异常当时捕获并用于completeExceptionallyCompletableFuture将被退回.

换句话说,在for循环中,捕获CompletableFuture返回的whenCompleteAsync,存储它,并打印出它的状态.

ArrayList<CompletableFuture<String>> list = new ArrayList<>();
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>();
for (int i = 0; i <100; i++) {
    CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
    CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> {
        System.out.println("Complete " + e);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e1) {e1.printStackTrace();}
    }, executorService);
    dependents.add(thisWillHaveException);
    list.add(stringCompletableFuture);
}

for (int i = 0; i < list.size(); i++) {
    list.get(i).complete(i + "");
}
Thread.sleep(2000);
dependents.forEach(cf -> {
    cf.whenComplete((r, e) -> {
        if (e != null)
            System.out.println(cf + " " + e.getMessage());
    });
});
Run Code Online (Sandbox Code Playgroud)

您会注意到它们都是(除了之前成功打印的6个)以外的特殊情况RejectedExecutionException.

...
java.util.concurrent.CompletableFuture@2d8e6db6[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@3f91beef rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@23ab930d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@1a6c5a9e rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@4534b60d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@37bba400 rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
Run Code Online (Sandbox Code Playgroud)