为什么我的 CompletableFuture 代码在 Java 8 中运行而不在 Java 11 中运行?

Mis*_* D. 7 java multithreading asynchronous completable-future java-11

为什么这段代码在 Java 8 和 Java 11 中表现不同?

private static String test2() {
    CompletableFuture
            .runAsync(() -> IntStream.rangeClosed(1, 20).forEach(x -> {
                try {
                    Thread.sleep(500);
                    System.out.println(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));

    return "Finish";
}
Run Code Online (Sandbox Code Playgroud)

我希望它打印完成,然后以 500 毫秒的间隔打印从 1 到 20 的数字,然后停止执行,它在 Java 8 中正常工作。

然而,当我在 Java 11 上运行完全相同的方法时,它打印 Finish 并终止,而没有调用 runAsync(...) 代码。我设法通过添加这样的 ExecutorService 来启动它

private static String test2() {

    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    CompletableFuture
            .runAsync(() -> IntStream.rangeClosed(1, 10).forEach(x -> {
                try {
                    Thread.sleep(500);
                    System.out.println(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }), executorService);

    return "Finish";
}
Run Code Online (Sandbox Code Playgroud)

现在它被执行了,但没有完成;它达到 10,之后就没有完成就坐着。我想出了如何通过executorService.shutdown();在返回前调用来停止执行,但我 100% 确定这种方法是错误的,因为通常我会为许多方法使用相同的 executorService,如果我关闭它,其他方法也将无法执行.

Java 8 和 Java 11 之间发生了什么变化,为什么我现在必须添加显式执行器服务,最重要的是如何正确完成方法执行?

mne*_*rov 6

TL;DR -ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.SECONDS); 在您调用CompletableFuture.runAsync 代码之后和代码末尾添加,这样System.exit就不会停止您的可运行。这样你就会得到你的行为。


更长的答案:

好的,首先,我在 Oracles java 8、OpenJDK 8 和 OpenJDK 11 中尝试了这两个示例。全面一致的行为,所以我的回答是,在不同 Java 版本的这些实现中没有任何改变会导致这种差异。在这两个示例中,您看到的行为与 Java 告诉您的行为一致。

从文档 CompletableFuture.runAsync

返回一个新的 CompletableFuture,ForkJoinPool.commonPool()它在运行给定操作后由运行在 中的任务异步完成。

好的...让我们看看ForkJoinPool.commonPool会告诉我们什么(强调我的):

返回公共池实例。这个池是静态构建的;它的运行状态不受尝试shutdown()或 的影响shutdownNow()然而,这个池和任何正在进行的处理都会在程序后自动终止System.exit(int)。任何依赖异步任务处理在程序终止之前完成的程序都应该commonPool().awaitQuiescence在退出之前调用, 。

啊哈,所以这就是为什么我们在使用公共池时没有看到倒计时,因为公共池会在系统退出时终止,这正是我们从方法返回并退出程序时发生的情况(假设您的示例是真的像你展示的那样简单......就像在main......无论如何)

那么为什么自定义执行器会起作用呢?因为,正如您已经注意到的,该执行程序尚未终止。后台还有一段代码在运行,虽然很闲,那Java没有停下来的权力。


那么我们现在能做什么呢?

一种选择是做我们自己的执行程序并在我们完成后将其关闭,就像您建议的那样。我认为这种方法毕竟使用起来并不是那么糟糕。

第二种选择是遵循 java doc 所说的。

任何依赖异步任务处理在程序终止之前完成的程序都应该commonPool().awaitQuiescence在退出之前调用, 。

public boolean awaitQuiescence?(长时间超时,TimeUnit 单位)

如果由在此池中运行的 ForkJoinTask 调用,则等效于 ForkJoinTask.helpQuiesce()。否则,等待和/或尝试协助执行任务,直到此池 isQuiescent() 或指示的超时过去。

因此,我们可以调用该方法并为公共池中的所有公共进程指定超时。我的观点是,这在某种程度上是特定业务的,因为现在您必须回答这个问题 -现在超时到底应该是什么??.

第三种选择是使用CompletableFutures的力量并将此runAsync方法提升到变量:

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> ...
...
...
bla bla bla code bla bla bla
...
...
voidCompletableFuture.join();
// or if you want to handle exceptions, use get
voidCompletableFuture.get();
Run Code Online (Sandbox Code Playgroud)

然后就在您需要它时,您join()/get()可以将需要的任何东西作为返回值。我最喜欢这个,因为代码像这样最干净和易懂。此外,我可以随心所欲地链接我的 CF,并用它们做一些时髦的事情。


在这种情况下,你不需要一个返回值,不需要其他人做任何事情,只是想从1到20的计数的字符串和异步处理的简单回归,然后就推ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.SECONDS);在方便的地方给你,并给它一些荒谬的超时,从而保证您将退出所有空闲进程。

  • @Kaplan `join()` 不会抛出*已检查*异常。它仍然会抛出异常,即“CancellationException”或“CompletionException”,但您不需要捕获它们。 (2认同)