我有一个关于CompletableFuture方法的问题:
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
Run Code Online (Sandbox Code Playgroud)
事情是JavaDoc说的就是这样:
返回一个新的CompletionStage,当该阶段正常完成时,将使用此阶段的结果作为所提供函数的参数执行.有关特殊完成的规则,请参阅CompletionStage文档.
线程怎么样?这将在哪个线程中执行?如果未来由线程池完成怎么办?
Given this piece of code:
public List<String> findPrices(String product){
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor
)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
Run Code Online (Sandbox Code Playgroud)
This part of it:
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor
)))
Run Code Online (Sandbox Code Playgroud)
Could it be rewrite as:
.map(future ->
future.thenComposeAsync(quote -> Discount.applyDiscount(quote), executor))
Run Code Online (Sandbox Code Playgroud)
I took this code from an example of a book and says the two solutions are …
此代码引自Java 8 in Action,也出现在本书11.4.3中.
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
Run Code Online (Sandbox Code Playgroud)
沿着代码,编写器附上如下图,表示applyDiscount()同一个线程中的工作getPrice(),我强烈怀疑:这里有两个不同的Async后缀,这意味着第二个调用应该在另一个线程中.
我使用以下代码在本地测试它:
private static void testBasic() {
out.println("*****************************************");
out.println("********** TESTING thenCompose **********");
CompletableFuture[] futures = IntStream.rangeClosed(0, LEN).boxed()
.map(i -> CompletableFuture.supplyAsync(() -> runStage1(i), EXECUTOR_SERVICE))
.map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> runStage2(i), EXECUTOR_SERVICE)))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
}
Run Code Online (Sandbox Code Playgroud)
输出进一步证明了我的想法,这是正确的吗?
***************************************** …Run Code Online (Sandbox Code Playgroud) 我正在学习 CompletableFuture API,有一个例子:
CompletableFuture.completedFuture(url)
.thenComposeAsync(this::readPage, executor)
.thenApply(this::getImageURLs)
.thenApply(this::saveFoundImages)
.....
Run Code Online (Sandbox Code Playgroud)
我有一个问题:如果我将thenComposeAsync(...)方法作为第一个调用,链中的其他方法会在executor我通过参数传递的方法中执行,还是应该使用async调用其他方法以在特定执行程序中异步执行?
java concurrency multithreading asynchronous completable-future