CompletableFuture allof(..).join() vs CompletableFuture.join()

G S*_*noy 8 java spring multithreading threadpool completable-future

我目前正在使用 CompletableFuture supplyAsync() 方法将一些任务提交到公共线程池。下面是代码片段的样子:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);
Run Code Online (Sandbox Code Playgroud)

我想知道下面的代码与上面的代码有何不同。我从下面的代码中删除了父 completableFuture,并为每个 completableFuture 添加了 join() 而不是 getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);
Run Code Online (Sandbox Code Playgroud)

我在 spring 服务中使用它,并且存在线程池耗尽的问题。任何指针都深表感谢。

Hol*_*ger 9

首先,.getNow()不起作用,因为此方法需要一个回退值作为未来尚未完成的情况的参数。由于您假设未来要在此处完成,因此您还应该使用join().

然后,线程耗尽没有区别,因为在任何一种情况下,您都在等待所有作业完成后再继续,这可能会阻塞当前线程。

避免这种情况的唯一方法是重构代码以不期望同步结果,而是在所有作业完成后安排后续处理操作完成。然后,使用allOf变得相关:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
    .map(resolver -> supplyAsync(() -> task.doWork()))
    .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
    .thenAccept(justVoid -> {
        // here, all jobs have been completed
        final List<Test> tests = completableFutures.stream()
            .flatMap(completableFuture -> completableFuture.join().stream())
            .collect(toList());
        // process the result here
    });
Run Code Online (Sandbox Code Playgroud)

顺便说一句,关于toArray收藏的方法,我推荐阅读《上古智慧阵》 ……