假设我有以下代码:
CompletableFuture<Integer> future
= CompletableFuture.supplyAsync( () -> 0);
Run Code Online (Sandbox Code Playgroud)
thenApply
案件:
future.thenApply( x -> x + 1 )
.thenApply( x -> x + 1 )
.thenAccept( x -> System.out.println(x));
Run Code Online (Sandbox Code Playgroud)
这里输出为2.现在如下thenApplyAsync
:
future.thenApplyAsync( x -> x + 1 ) // first step
.thenApplyAsync( x -> x + 1 ) // second step
.thenAccept( x -> System.out.println(x)); // third step
Run Code Online (Sandbox Code Playgroud)
我在这个博客中读到,每个thenApplyAsync
都是在一个单独的线程中执行,并且"同时"(这意味着thenApplyAsyncs
在thenApplyAsyncs
完成之前开始跟随),如果是这样,如果第一步没有完成,第二步的输入参数值是多少?
如果不采取第二步,第一步的结果会在哪里?第三步将采取哪一步的结果?
如果第二步必须等待第一步的结果那么重点是Async
什么?
这里x - > x + 1只是为了表明这一点,我想知道的是在非常长的计算情况下.
在java-9 中,引入了类中的新方法completeOnTimeout
CompletableFuture
:
public CompletableFuture<T> completeOnTimeout(T value, long timeout,
TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(
new DelayedCompleter<T>(this, value),
timeout, unit)));
return this;
}
Run Code Online (Sandbox Code Playgroud)
我不明白为什么它在其实现中使用静态 ScheduledThreadPoolExecutor
:
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
Run Code Online (Sandbox Code Playgroud)
哪里
static final ScheduledThreadPoolExecutor delayer;
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
Run Code Online (Sandbox Code Playgroud)
对我来说这是一种非常奇怪的方法,因为它可能成为整个应用程序的瓶颈:唯一一个ScheduledThreadPoolExecutor
只有一个线程保留在池中以执行所有可能的CompletableFuture
任务?
我在这里错过了什么?
PS它看起来像:
1)这段代码的作者不愿意提取这种逻辑,而是倾向于重用ScheduledThreadPoolExecutor …
java multithreading threadpoolexecutor java-9 completable-future
我(大多数)理解CompletableFuture的三种执行方法:
我的问题是:何时应该支持使用非异步方法?
如果你有一个代码块调用其他方法也返回CompletableFuture
s会发生什么?从表面上看,这可能看起来很便宜,但如果这些方法也使用非异步调用会发生什么?这不是一个长的非异步块加起来可能会变得昂贵吗?
是否应该将非异步执行的使用限制为不调用其他方法的简短定义的代码块?
Given this piece of code:
public List<String> findPrices(String product){
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor
)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
Run Code Online (Sandbox Code Playgroud)
This part of it:
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor
)))
Run Code Online (Sandbox Code Playgroud)
Could it be rewrite as:
.map(future ->
future.thenComposeAsync(quote -> Discount.applyDiscount(quote), executor))
Run Code Online (Sandbox Code Playgroud)
I took this code from an example of a book and says the two solutions are …
这个问题不同于这个Java8 thenCompose and thenComposeAsync之间的区别,因为我想知道作者使用thenCompose
而不是的原因是什么thenComposeAsync
。
我正在阅读《现代 Java 实战》,在第 405 页上遇到了这部分代码:
public static List<String> findPrices(String product) {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Shop> shops = Arrays.asList(new Shop(), new Shop());
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join).collect(toList());
}
Run Code Online (Sandbox Code Playgroud)
一切正常,我可以理解这段代码,但这是作者为什么没有thenComposeAsync
在第 408 页使用的原因,我无法理解:
通常,名称中没有 Async 后缀的方法在与前一个任务相同的线程中执行其任务,而以 Async 终止的方法总是将后续任务提交到线程池,因此每个任务都可以由不同的线程处理线。在这种情况下,第二个 CompletableFuture 的结果取决于第一个,因此无论您使用此方法的一个或另一个变体组合两个 CompletableFuture,对最终结果或其粗略计时都没有影响
根据我对thenCompose
( 和thenComposeAsync
) 签名的理解,如下所示:
public <U> …
Run Code Online (Sandbox Code Playgroud) 所以我有一个方法返回一个CompletableFuture
.在返回之前,此方法添加一个块,thenAccept
在CompletableFuture
完成后执行该块.
此方法的调用者还添加了另一个块thenAccept
.显然,这可以继续多个链式调用.
调用CompletionStage
返回的顺序是什么thenAccept
?是否保证是添加它们的顺序?如果没有,如何保证它们按照添加顺序执行?
PS:我根据自己的经验CompletableFuture
和本文提出这个问题
假设我们有两个执行者,1 和 2。
我们可以配置执行时使用哪个执行器
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(()-> {return 1;}, executor1) //executor1
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(()-> {return 2;}, executor1) //executor1
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(()-> {return 3;}, executor2) //executor2
Run Code Online (Sandbox Code Playgroud)
但是哪个线程执行器使用 CompletableFuture 静态方法 allOf?
CompletableFuture.allOf(cf1, cf2, cf3)
Run Code Online (Sandbox Code Playgroud)
谢谢!
我开始熟悉 JavaCompletableFuture
组合,使用过 JavaScript 承诺。基本上,组合只是在指定的执行器上安排了链式命令。但是我不确定在执行组合时哪个线程正在运行。
假设我有两个执行者,executor1
并且executor2
;为简单起见,假设它们是单独的线程池。我安排了一个CompletableFuture
(使用非常松散的描述):
CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(this::getFoo, executor1);
Run Code Online (Sandbox Code Playgroud)
然后,当做到这一点我转换Foo
到Bar
使用第二执行人:
CompletableFuture<Bar> futureBar .thenApplyAsync(this::fooToBar, executor2);
Run Code Online (Sandbox Code Playgroud)
我知道getFoo()
将从executor1
线程池中的线程调用。我知道fooToBar()
将从executor2
线程池中的线程调用。
但是什么线程用于实际的组合,即在getFoo()
完成和futureFoo()
完成之后;但之前的fooToBar()
命令被安排在executor2
?换句话说,哪个线程实际运行代码以在第二个执行器上调度第二个命令?
调度是否作为executor1
调用的同一线程的一部分执行getFoo()
?如果是这样,这个可完成的未来组合是否等同于我fooToBar()
在executor1
任务的第一个命令中自己手动安排?