Luk*_*kas 17 java multithreading java-8 completable-future
我正在玩Java 8可完成的期货.我有以下代码:
CountDownLatch waitLatch = new CountDownLatch(1);
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
try {
System.out.println("Wait");
waitLatch.await(); //cancel should interrupt
System.out.println("Done");
} catch (InterruptedException e) {
System.out.println("Interrupted");
throw new RuntimeException(e);
}
});
sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");
assertTrue(future.isCancelled());
assertTrue(future.isDone());
sleep(100); //give it some time to finish
Run Code Online (Sandbox Code Playgroud)
使用runAsync我计划执行等待锁存器的代码.接下来我取消了未来,期望被抛入中断的异常.但似乎线程在await调用上仍然被阻塞,即使未来被取消(断言传递),也不会抛出InterruptedException.使用ExecutorService的等效代码按预期工作.它是CompletableFuture中的错误还是我的示例中的错误?
nos*_*sid 13
显然,这是故意的.CompletableFuture :: cancel方法的Javadoc 状态:
[参数:] mayInterruptIfRunning -这个值没有在该实施方式的效果,因为中断将不被用于控制处理.
有趣的是,ForkJoinTask :: cancel方法对参数mayInterruptIfRunning使用几乎相同的措辞.
我猜这个问题:
CompletableFuture应该创建一个新的CompletionStage,而不是阻塞,而cpu绑定任务是fork-join模型的先决条件.因此,使用其中任何一个中断都会失败他们的目的.而另一方面,它可能会增加复杂性,如果按预期使用则不需要.
如果您确实希望能够取消任务,那么您必须使用Future其自身(例如,由ExecutorService.submit(Callable<T>),而不是返回。正如nosidCompletableFuture在答案中指出的那样,完全忽略对 的任何调用。CompletableFuturecancel(true)
我怀疑 JDK 团队没有实现中断,因为:
InputStream.read()阻塞调用!(JDK 团队没有计划让标准 I/O 系统再次可中断,就像 Java 早期那样。)Object.finalize()、Object.wait()、Thread.stop()等。我相信Thread.interrupt()它们属于最终必须弃用和替换的类别。因此,较新的 API(例如ForkJoinPool和CompletableFuture)已经不支持它。CompletableFuture旨在构建 DAG 结构的操作管道,类似于 Java StreamAPI。很难简洁地描述数据流 DAG 的一个节点的中断如何影响 DAG 其余部分的执行。(当任何节点中断时,是否应该立即取消所有并发任务?)解决这个问题的一种非常巧妙的方法是让每个人将CompletableFuture对其自身的引用导出到外部可见的AtomicReference,然后Thread在需要时可以从另一个外部线程直接中断该引用。ExecutorService或者,如果您使用自己的任务启动所有任务ThreadPool,则可以手动中断已启动的任何或所有线程,即使CompletableFuture拒绝通过cancel(true). (请注意,虽然CompletableFuturelambda 不能抛出已检查的异常,所以如果您在 a 中有可中断的等待CompletableFuture,则必须作为未检查的异常重新抛出。)
AtomicReference<Boolean> cancel = new AtomicReference<>()更简单地说,您可以在外部作用域中声明 an ,并定期从每个CompletableFuture任务的 lambda 内部检查此标志。
您还可以尝试设置Future实例的 DAG 而不是CompletableFuture实例的 DAG,这样您就可以准确指定任何一项任务中的异常和中断/取消应如何影响其他当前正在运行的任务。我在我的问题中的示例代码中展示了如何执行此操作,并且效果很好,但有很多样板文件。
当您致电时CompletableFuture#cancel,您只会停止链的下游部分.上游部分,即最终会调用的东西,complete(...)或者completeExceptionally(...)不会得到任何不再需要结果的信号.
什么是'上游'和'下游'的东西?
我们考虑以下代码:
CompletableFuture
.supplyAsync(() -> "hello") //1
.thenApply(s -> s + " world!") //2
.thenAccept(s -> System.out.println(s)); //3
Run Code Online (Sandbox Code Playgroud)
在这里,数据从上到下流动 - 从供应商创建,到功能修改,再到消费println.上述特定步骤的部分称为上游部分,下部部分称为下游部分.E. g.步骤1和2是步骤3的上游.
这是幕后发生的事情.这不是精确的,而是一个方便的思维模型.
ForkJoinPool).complete(...)到下一个CompletableFuture下游.CompletableFuture调用下一步 - 一个函数(步骤2),它接收前一步结果并返回将进一步传递给下游CompletableFuture的东西complete(...).CompletableFuture调用消费者,System.out.println(s).消费者完成后,下游CompletableFuture将收到它的价值,(Void) null正如我们所看到的,CompletableFuture这个链中的每个人都必须知道下游有谁在等待将值传递给他们complete(...)(或completeExceptionally(...)).但是,它CompletableFuture不必知道它的上游(或上游 - 可能有几个).
因此,调用cancel()步骤3不会中止步骤1和2,因为从步骤3到步骤2没有链接.
假设如果你正在使用CompletableFuture那么你的步骤足够小,这样如果执行一些额外的步骤就没有坏处.
如果要将取消传播到上游,您有两种选择:
CompletableFuture(名称就好cancelled),在每一步之后检查(类似step.applyToEither(cancelled, Function.identity()))| 归档时间: |
|
| 查看次数: |
11173 次 |
| 最近记录: |