And*_*niy 14 java multithreading threadpoolexecutor java-9 completable-future
在java-9 中,引入了类中的新方法completeOnTimeoutCompletableFuture:
public CompletableFuture<T> completeOnTimeout(T value, long timeout,
TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(
new DelayedCompleter<T>(this, value),
timeout, unit)));
return this;
}
Run Code Online (Sandbox Code Playgroud)
我不明白为什么它在其实现中使用静态 ScheduledThreadPoolExecutor:
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
Run Code Online (Sandbox Code Playgroud)
哪里
static final ScheduledThreadPoolExecutor delayer;
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
Run Code Online (Sandbox Code Playgroud)
对我来说这是一种非常奇怪的方法,因为它可能成为整个应用程序的瓶颈:唯一一个ScheduledThreadPoolExecutor只有一个线程保留在池中以执行所有可能的CompletableFuture任务?
我在这里错过了什么?
PS它看起来像:
1)这段代码的作者不愿意提取这种逻辑,而是倾向于重用ScheduledThreadPoolExecutor,
2)这显然导致了静态变量的这种解决方案,因为为每个变量创建一个新的执行器是非常低效的CompletableFuture.
但我仍然怀疑,因为我觉得一般的做法很奇怪.
你是对的,这可能成为一个瓶颈,但不是完成本身,这只是设置一个变量CompletableFuture.这个单线程可以在一秒钟内完成数百万个期货.关键方面是完成可以触发完成线程内的依赖阶段的评估.
所以
Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
CompletableFuture.supplyAsync(() -> "foo", neverDone)
.completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
.thenApply(s -> {
System.out.println("long dependent action 1 "+Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return s;
});
CompletableFuture<String> c12 =
CompletableFuture.supplyAsync(() -> "bar", neverDone)
.completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
.thenApply(s -> {
System.out.println("long dependent action 2 "+Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return s;
});
System.out.println("set up");
CompletableFuture.allOf(
c11.thenAccept(System.out::println),
c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");
Run Code Online (Sandbox Code Playgroud)
将打印
set up
long dependent action 1 Thread[CompletableFutureDelayScheduler,5,main]
timeout
long dependent action 2 Thread[CompletableFutureDelayScheduler,5,main]
timeout
12 s
Run Code Online (Sandbox Code Playgroud)
使用…Async链接方法将消除该问题
Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
CompletableFuture.supplyAsync(() -> "foo", neverDone)
.completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
.thenApplyAsync(s -> {
System.out.println("long dependent action 1 "+Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return s;
});
CompletableFuture<String> c12 =
CompletableFuture.supplyAsync(() -> "bar", neverDone)
.completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
.thenApplyAsync(s -> {
System.out.println("long dependent action 2 "+Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return s;
});
System.out.println("set up");
CompletableFuture.allOf(
c11.thenAccept(System.out::println),
c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");
Run Code Online (Sandbox Code Playgroud)
将打印
set up
long dependent action 2 Thread[ForkJoinPool.commonPool-worker-2,5,main]
long dependent action 1 Thread[ForkJoinPool.commonPool-worker-9,5,main]
timeout
timeout
7 s
Run Code Online (Sandbox Code Playgroud)
结论是,当您进行可能冗长的评估时,您应始终通过其中一种…Async方法进行链接.鉴于在使用没有"... Async"后缀的方法时没有对执行线程的控制(它也可能是调用链接方法的线程或任何其他调用"完成方法"的线程,请参阅此答案),这是你应该做什么.