作者使用 thenCompose 而不是 thenComposeAsync 的原因是否正确

Tas*_*isi 8 java java-8 completable-future

这个问题不同于这个Java8 thenCompose and thenComposeAsync之间的区别,因为我想知道作者使用thenCompose而不是的原因是什么thenComposeAsync

我正在阅读《现代 Java 实战》,在第 405 页上遇到了这部分代码:

public static List<String> findPrices(String product) {
    ExecutorService executor = Executors.newFixedThreadPool(10);
    List<Shop> shops = Arrays.asList(new Shop(), new Shop());
    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote ->
                    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
            .collect(toList());
    return priceFutures.stream()
            .map(CompletableFuture::join).collect(toList());
}
Run Code Online (Sandbox Code Playgroud)

一切正常,我可以理解这段代码,但这是作者为什么没有thenComposeAsync在第 408 页使用的原因,我无法理解:

通常,名称中没有 Async 后缀的方法在与前一个任务相同的线程中执行其任务,而以 Async 终止的方法总是将后续任务提交到线程池,因此每个任务都可以由不同的线程处理线。在这种情况下,第二个 CompletableFuture 的结果取决于第一个,因此无论您使用此方法的一个或另一个变体组合两个 CompletableFuture,对最终结果或其粗略计时都没有影响

根据我对thenCompose( 和thenComposeAsync) 签名的理解,如下所示:

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(asyncPool, fn);
}
Run Code Online (Sandbox Code Playgroud)

在许多情况下,第二个CompletableFuturecan的结果取决于前一个CompletableFuture(或者我可以说几乎总是这样),我们应该在这些情况下使用thenCompose而不是thenComposeAsync

如果我们在第二个中有阻塞代码CompletableFuture怎么办?

这是一个类似的例子,由在这里回答类似问题的人给出:Difference between Java8 thenCompose and thenComposeAsync

public CompletableFuture<String> requestData(Quote quote) {
    Request request = blockingRequestForQuote(quote);
    return CompletableFuture.supplyAsync(() -> sendRequest(request));
}
Run Code Online (Sandbox Code Playgroud)

在我看来,在这种情况下使用thenComposeAsync可以使我们的程序更快,因为这里blockingRequestForQuote可以在不同的线程上运行。但是根据作者的意见,我们不应该使用thenComposeAsync它,因为它取决于第一个CompletableFuture结果(即 Quote)。

我的问题是:

作者的想法是否正确,他说:

在这种情况下,第二个 CompletableFuture 的结果取决于第一个,因此无论您使用此方法的一个或另一个变体组合两个 CompletableFuture,对最终结果或其粗略计时都没有影响

Hol*_*ger 9

TL;DR使用thenCompose而不是在thenComposeAsync这里是正确的,但不是出于所引用的原因。通常,代码示例不应用作您自己代码的模板。


本章是 Stackoverflow 上反复出现的主题,原因我们可以最好地描述为“质量不足”,以保持礼貌。

通常,名称中没有 Async 后缀的方法在与前一个任务相同的线程中执行其任务,......

规范中对执行线程没有这样的保证。该文件说:

  • 为非异步方法的依赖完成提供的操作可以由完成当前 CompletableFuture 的线程或由完成方法的任何其他调用者执行。

因此,任务也有可能“由完成方法的任何其他调用者”执行。一个直观的例子是

CompletableFuture<X> f = CompletableFuture.supplyAsync(() -> foo())
    .thenApply(f -> f.bar());
Run Code Online (Sandbox Code Playgroud)

涉及到两个线程。一个调用supplyAsyncthenApply另一个将调用foo(). 如果第二个foo()在第一个线程进入 的执行之前完成了 的调用thenApply,则有可能未来已经完成。

未来不记得哪个线程完成了它。它也没有一些神奇的能力来告诉该线程执行某个操作,尽管它可能正忙于其他事情,甚至从那时起就已终止。所以很明显,调用thenApply一个已经完成的未来不能保证使用完成它的线程。大多数情况下,它会在调用 的线程中立即执行操作thenApply。规范的措辞“完成方法的任何其他调用者”涵盖了这一点。

但这并不是故事的结局。正如这个答案所解释的那样,当涉及两个以上的线程时,该操作也可以由另一个线程同时在未来调用不相关的完成方法来执行。这可能很少发生,但在参考实现中是可能的,并且是规范允许的。

我们可以将其总结为: 没有Async 的方法对将执行操作的线程提供最少的控制,甚至可能在调用线程中正确执行,从而导致同步行为。

因此,当执行线程无关紧要并且您不希望后台线程执行(即简短的非阻塞操作)时,它们是最好的。

而以 Async 终止的方法总是将后续任务提交给线程池,因此每个任务都可以由不同的线程处理。在这种情况下,第二个 CompletableFuture 的结果取决于第一个……

当你做

future.thenCompose(quote ->
    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))
Run Code Online (Sandbox Code Playgroud)

3个期货参与,所以它不完全清楚,其未来是由“第二”的意思。supplyAsync正在提交一个动作并返回一个未来。提交包含在传递给 的函数中thenCompose,该函数将返回另一个未来。

如果你thenComposeAsync在这里使用,你只要求执行supplyAsync必须提交到线程池,而不是直接在完成线程或“完成方法的任何其他调用者”中执行,例如直接在线程调用中执行thenCompose

关于依赖的推理在这里没有意义。“ then ”总是暗示依赖。如果在thenComposeAsync这里使用,则是强制将动作提交给线程池,但是在future. 如果future异常完成,则根本不会提交。

那么,thenCompose在这里使用合理吗?是的,但不是因为给出的原因是报价。如上所述,使用非异步方法意味着放弃对正在执行的线程的控制,并且应该只在线程无关紧要时使用,尤其是对于简短的非阻塞操作。调用supplyAsync是一种廉价的操作,它会自行将实际操作提交给线程池,因此可以在任何空闲线程中执行它。

然而,这是一个不必要的并发症。您可以使用

future.thenApplyAsync(quote -> Discount.applyDiscount(quote), executor)
Run Code Online (Sandbox Code Playgroud)

这将完全相同,提交applyDiscountexecutor何时future完成并产生一个代表结果的新未来。使用的组合thenComposesupplyAsync没有必要在这里。

请注意,这个例子已经在这个问答中讨论过,它还解决了未来操作在多个Stream操作上的不必要隔离以及错误的序列图。