"Java 8 in Action"对它提供的演示有误吗?

Hea*_*ren 3 java asynchronous java-8 completable-future

此代码引自Java 8 in Action,也出现在本书11.4.3中.

public Stream<CompletableFuture<String>> findPricesStream(String product) {
    return shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
Run Code Online (Sandbox Code Playgroud)

沿着代码,编写器附上如下,表示applyDiscount()同一个线程中的工作getPrice(),我强烈怀疑:这里有两个不同的Async后缀,这意味着第二个调用应该在另一个线程中.

在此输入图像描述

我使用以下代码在本地测试它:

private static void testBasic() {
    out.println("*****************************************");
    out.println("********** TESTING thenCompose **********");
    CompletableFuture[] futures = IntStream.rangeClosed(0, LEN).boxed()
            .map(i -> CompletableFuture.supplyAsync(() -> runStage1(i), EXECUTOR_SERVICE))
            .map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> runStage2(i), EXECUTOR_SERVICE)))
            .toArray(size -> new CompletableFuture[size]);
    CompletableFuture.allOf(futures).join();
}
Run Code Online (Sandbox Code Playgroud)

输出进一步证明了我的想法,这是正确的吗?

*****************************************
********** TESTING thenCompose **********
Start: stage - 1 - value: 0 - thread name: pool-1-thread-1
Start: stage - 1 - value: 1 - thread name: pool-1-thread-2
Start: stage - 1 - value: 2 - thread name: pool-1-thread-3
Start: stage - 1 - value: 3 - thread name: pool-1-thread-4
Finish: stage - 1 - value: 3 - thread name: pool-1-thread-4 - time cost: 1520
Start: stage - 2 - value: 3 - thread name: pool-1-thread-5
Finish: stage - 1 - value: 0 - thread name: pool-1-thread-1 - time cost: 1736
Start: stage - 2 - value: 0 - thread name: pool-1-thread-6
Finish: stage - 1 - value: 2 - thread name: pool-1-thread-3 - time cost: 1761
Start: stage - 2 - value: 2 - thread name: pool-1-thread-7
Finish: stage - 2 - value: 2 - thread name: pool-1-thread-7 - time cost: 446
Finish: stage - 1 - value: 1 - thread name: pool-1-thread-2 - time cost: 2249
Start: stage - 2 - value: 1 - thread name: pool-1-thread-8
Finish: stage - 2 - value: 3 - thread name: pool-1-thread-5 - time cost: 828
Finish: stage - 2 - value: 0 - thread name: pool-1-thread-6 - time cost: 704
Finish: stage - 2 - value: 1 - thread name: pool-1-thread-8 - time cost: 401
Run Code Online (Sandbox Code Playgroud)

Java 8 in Action是错的吗?

谢谢你,@霍尔格.你让晶莹剔透我现在对执行线程异步非异步方法.特别是在检查其规格后,进一步证明了你的观点.

为非异步方法的依赖完成提供的动作可以由完成当前CompletableFuture的线程执行,或者由完成方法的任何其他调用者执行.

Hol*_*ger 7

首先要注意的是,由于不必要地分成多个Stream操作,该代码会分散正在发生的事情.

此外,没有任何意义

future.thenCompose(quote ->
    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))
Run Code Online (Sandbox Code Playgroud)

代替

future.thenApplyAsync(quote -> Discount.applyDiscount(quote), executor)
Run Code Online (Sandbox Code Playgroud)

因此,一个更简单的例子就是这样做

public Stream<CompletableFuture<String>> findPricesStream(String product) {
    return shops.stream().map(
        shop -> CompletableFuture
            .supplyAsync(() -> shop.getPrice(product), executor)
            .thenApply(Quote::parse)
            .thenApplyAsync(quote -> Discount.applyDiscount(quote), executor));
}
Run Code Online (Sandbox Code Playgroud)

但是,你是对的,没有保证getPrice并且applyDiscount在同一个线程中运行 - 除非执行程序是单线程执行程序.

您可以将"执行程序线程"解释为"执行程序的一个线程",但即使这样,图中也存在一个危险的错误点,即" new Quote(price)",显然实际上意味着" Quote::parse".这一步也不会属于右侧,作为实际线程评价传递给函数thenApply是不确定的.它可能是完成上一阶段后执行程序的线程之一,但在调用时它也可能是"你的线程" thenApply,例如,如果异步操作设法在中间完成.

CompletableFuture提议没有办法执行了相关的操作,使用第一阶段的完成线程.

除非您使用简单的顺序代码,当然:

public Stream<CompletableFuture<String>> findPricesStream(String product) {
    return shops.stream().map(shop -> CompletableFuture
        .supplyAsync(() -> Discount.applyDiscount(Quote.parse(shop.getPrice(product))), executor));
}
Run Code Online (Sandbox Code Playgroud)

然后,右侧的线性图片的图片将是正确的.