如何取消Java 8可完成的未来?

Luk*_*kas 17 java multithreading java-8 completable-future

我正在玩Java 8可完成的期货.我有以下代码:

CountDownLatch waitLatch = new CountDownLatch(1);

CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
    try {
        System.out.println("Wait");
        waitLatch.await(); //cancel should interrupt
        System.out.println("Done");
    } catch (InterruptedException e) {
        System.out.println("Interrupted");
        throw new RuntimeException(e);
    }
});

sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");

assertTrue(future.isCancelled());

assertTrue(future.isDone());
sleep(100); //give it some time to finish
Run Code Online (Sandbox Code Playgroud)

使用runAsync我计划执行等待锁存器的代码.接下来我取消了未来,期望被抛入中断的异常.但似乎线程在await调用上仍然被阻塞,即使未来被取消(断言传递),也不会抛出InterruptedException.使用ExecutorService的等效代码按预期工作.它是CompletableFuture中的错误还是我的示例中的错误?

nos*_*sid 13

显然,这是故意的.CompletableFuture :: cancel方法的Javadoc 状态:

[参数:] mayInterruptIfRunning -这个值没有在该实施方式的效果,因为中断将不被用于控制处理.

有趣的是,ForkJoinTask :: cancel方法对参数mayInterruptIfRunning使用几乎相同的措辞.

我猜这个问题:

  • 中断旨在用于阻塞操作,如睡眠,等待或I/O操作,
  • CompletableFutureForkJoinTask都不打算与阻塞操作一起使用.

CompletableFuture应该创建一个新的CompletionStage,而不是阻塞,而cpu绑定任务是fork-join模型的先决条件.因此,使用其中任何一个中断都会失败他们的目的.而另一方面,它可能会增加复杂性,如果按预期使用则不需要.


Luk*_*son 9

如果您确实希望能够取消任务,那么您必须使用Future其自身(例如,由ExecutorService.submit(Callable<T>),而不是返回。正如nosidCompletableFuture在答案中指出的那样,完全忽略对 的任何调用。CompletableFuturecancel(true)

我怀疑 JDK 团队没有实现中断,因为:

  1. 中断总是很棘手,人们很难理解,也很难处理。Java I/O 系统甚至是不可中断的,尽管调用是InputStream.read()阻塞调用!(JDK 团队没有计划让标准 I/O 系统再次可中断,就像 Java 早期那样。)
  2. JDK 团队一直在非常努力地淘汰早期 Java 时代中旧的、损坏的 API,例如Object.finalize()Object.wait()Thread.stop()等。我相信Thread.interrupt()它们属于最终必须弃用和替换的类别。因此,较新的 API(例如ForkJoinPoolCompletableFuture)已经不支持它。
  3. CompletableFuture旨在构建 DAG 结构的操作管道,类似于 Java StreamAPI。很难简洁地描述数据流 DAG 的一个节点的中断如何影响 DAG 其余部分的执行。(当任何节点中断时,是否应该立即取消所有并发任务?)
  4. 我怀疑 JDK 团队只是不想正确处理中断问题,因为 JDK 和库如今已经达到了内部复杂程度。(lambda 系统的内部结构——呃。)

解决这个问题的一种非常巧妙的方法是让每个人将CompletableFuture对其自身的引用导出到外部可见的AtomicReference,然后Thread在需要时可以从另一个外部线程直接中断该引用。ExecutorService或者,如果您使用自己的任务启动所有任务ThreadPool,则可以手动中断已启动的任何或所有线程,即使CompletableFuture拒绝通过cancel(true). (请注意,虽然CompletableFuturelambda 不能抛出已检查的异常,所以如果您在 a 中有可中断的等待CompletableFuture,则必须作为未检查的异常重新抛出。)

AtomicReference<Boolean> cancel = new AtomicReference<>()更简单地说,您可以在外部作用域中声明 an ,并定期从每个CompletableFuture任务的 lambda 内部检查此标志。

您还可以尝试设置Future实例的 DAG 而不是CompletableFuture实例的 DAG,这样您就可以准确指定任何一项任务中的异常和中断/取消应如何影响其他当前正在运行的任务。我在我的问题中的示例代码中展示了如何执行此操作,并且效果很好,但有很多样板文件。


Kir*_*kov 8

当您致电时CompletableFuture#cancel,您只会停止链的下游部分.上游部分,即最终会调用的东西,complete(...)或者completeExceptionally(...)不会得到任何不再需要结果的信号.

什么是'上游'和'下游'的东西?

我们考虑以下代码:

CompletableFuture
        .supplyAsync(() -> "hello")               //1
        .thenApply(s -> s + " world!")            //2
        .thenAccept(s -> System.out.println(s));  //3
Run Code Online (Sandbox Code Playgroud)

在这里,数据从上到下流动 - 从供应商创建,到功能修改,再到消费println.上述特定步骤的部分称为上游部分,下部部分称为下游部分.E. g.步骤1和2是步骤3的上游.

这是幕后发生的事情.这不是精确的,而是一个方便的思维模型.

  1. 正在执行供应商(步骤1)(在JVM的常见内部ForkJoinPool).
  2. 供应商的结果然后被传递complete(...)到下一个CompletableFuture下游.
  3. 在接收到结果时,CompletableFuture调用下一步 - 一个函数(步骤2),它接收前一步结果并返回将进一步传递给下游CompletableFuture的东西complete(...).
  4. 在收到步骤2结果后,步骤3 CompletableFuture调用消费者,System.out.println(s).消费者完成后,下游CompletableFuture将收到它的价值,(Void) null

正如我们所看到的,CompletableFuture这个链中的每个人都必须知道下游有谁在等待将值传递给他们complete(...)(或completeExceptionally(...)).但是,它CompletableFuture不必知道它的上游(或上游 - 可能有几个).

因此,调用cancel()步骤3不会中止步骤1和2,因为从步骤3到步骤2没有链接.

假设如果你正在使用CompletableFuture那么你的步骤足够小,这样如果执行一些额外的步骤就没有坏处.

如果要将取消传播到上游,您有两种选择:

  • 自己实现 - 创建一个专用CompletableFuture(名称就好cancelled),在每一步之后检查(类似step.applyToEither(cancelled, Function.identity()))
  • 使用反应堆栈,如RxJava 2,ProjectReactor/Flux或Akka Streams