CompletableFuture.Delayer中的静态ScheduledThreadPoolExecutor

And*_*niy 14 java multithreading threadpoolexecutor java-9 completable-future

在java-9 中,引入了类中的新方法completeOnTimeoutCompletableFuture:

public CompletableFuture<T> completeOnTimeout(T value, long timeout,
                                              TimeUnit unit) {
    if (unit == null)
        throw new NullPointerException();
    if (result == null)
        whenComplete(new Canceller(Delayer.delay(
                                       new DelayedCompleter<T>(this, value),
                                       timeout, unit)));
    return this;
}
Run Code Online (Sandbox Code Playgroud)

我不明白为什么它在其实现中使用静态 ScheduledThreadPoolExecutor:

    static ScheduledFuture<?> delay(Runnable command, long delay,
                                    TimeUnit unit) {
        return delayer.schedule(command, delay, unit);
    }
Run Code Online (Sandbox Code Playgroud)

哪里

    static final ScheduledThreadPoolExecutor delayer;
    static {
        (delayer = new ScheduledThreadPoolExecutor(
            1, new DaemonThreadFactory())).
            setRemoveOnCancelPolicy(true);
    }
Run Code Online (Sandbox Code Playgroud)

对我来说这是一种非常奇怪的方法,因为它可能成为整个应用程序的瓶颈:唯一一个ScheduledThreadPoolExecutor只有一个线程保留在池中以执行所有可能的CompletableFuture任务?

我在这里错过了什么?

PS它看起来像:

1)这段代码的作者不愿意提取这种逻辑,而是倾向于重用ScheduledThreadPoolExecutor,

2)这显然导致了静态变量的这种解决方案,因为为每个变量创建一个新的执行器是非常低效的CompletableFuture.

但我仍然怀疑,因为我觉得一般的做法很奇怪.

Hol*_*ger 6

你是对的,这可能成为一个瓶颈,但不是完成本身,这只是设置一个变量CompletableFuture.这个单线程可以在一秒钟内完成数百万个期货.关键方面是完成可以触发完成线程内的依赖阶段的评估.

所以

Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
    CompletableFuture.supplyAsync(() -> "foo", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApply(s -> {
            System.out.println("long dependent action 1 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
CompletableFuture<String> c12 =
    CompletableFuture.supplyAsync(() -> "bar", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApply(s -> {
            System.out.println("long dependent action 2 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
System.out.println("set up");
CompletableFuture.allOf(
    c11.thenAccept(System.out::println),
    c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");
Run Code Online (Sandbox Code Playgroud)

将打印

set up
long dependent action 1 Thread[CompletableFutureDelayScheduler,5,main]
timeout
long dependent action 2 Thread[CompletableFutureDelayScheduler,5,main]
timeout
12 s
Run Code Online (Sandbox Code Playgroud)

使用…Async链接方法将消除该问题

Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
    CompletableFuture.supplyAsync(() -> "foo", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApplyAsync(s -> {
            System.out.println("long dependent action 1 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
CompletableFuture<String> c12 =
    CompletableFuture.supplyAsync(() -> "bar", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApplyAsync(s -> {
            System.out.println("long dependent action 2 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
System.out.println("set up");
CompletableFuture.allOf(
    c11.thenAccept(System.out::println),
    c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");
Run Code Online (Sandbox Code Playgroud)

将打印

set up
long dependent action 2 Thread[ForkJoinPool.commonPool-worker-2,5,main]
long dependent action 1 Thread[ForkJoinPool.commonPool-worker-9,5,main]
timeout
timeout
7 s
Run Code Online (Sandbox Code Playgroud)

结论是,当您进行可能冗长的评估时,您应始终通过其中一种…Async方法进行链接.鉴于在使用没有"... Async"后缀的方法时没有对执行线程的控制(它也可能是调用链接方法的线程或任何其他调用"完成方法"的线程,请参阅此答案),这是你应该做什么.