如果使用thenApply,则CompletableFuture#whenComplete未被调用

kaq*_*qao 6 java concurrency multithreading java-8 completable-future

我有以下代码(由我之前的问题得出),它在远程服务器上安排任务,然后使用轮询完成ScheduledExecutorService#scheduleAtFixedRate.任务完成后,会下载结果.我想将a返回Future给调用者,以便他们可以决定阻止的时间和时间,并为他们提供取消任务的选项.

我的问题是,如果客户端取消方法Future返回的download,whenComplete则不执行块.如果我删除thenApply它.很明显,我对Future构图有些误解......我应该改变什么?

public Future<Object> download(Something something) {
    String jobId = schedule(something);
    CompletableFuture<String> job = pollForCompletion(jobId);
    return job.thenApply(this::downloadResult);
}

private CompletableFuture<String> pollForCompletion(String jobId) {
    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    CompletableFuture<String> completionFuture = new CompletableFuture<>();

    ScheduledFuture<?> checkFuture = executor.scheduleAtFixedRate(() -> {           
            if (pollRemoteServer(jobId).equals("COMPLETE")) {
                completionFuture.complete(jobId);
            }
    }, 0, 10, TimeUnit.SECONDS);
    completionFuture                
            .whenComplete((result, thrown) -> {
                System.out.println("XXXXXXXXXXX"); //Never happens unless thenApply is removed
                checkFuture.cancel(true);
                executor.shutdown();
            });
    return completionFuture;
}
Run Code Online (Sandbox Code Playgroud)

同样,如果我这样做:

return completionFuture.whenComplete(...)
Run Code Online (Sandbox Code Playgroud)

代替

completionFuture.whenComplete(...);
return completionFuture;
Run Code Online (Sandbox Code Playgroud)

whenComplete也永远不会执行.这对我来说似乎非常违反直觉.应该不是逻辑上Future的返回whenComplete是一个我应该继续保留?

编辑:

我更改了代码以显式反向传播取消.这是令人憎恶和难以理解的,但它有效,我找不到更好的方法:

public Future<Object> download(Something something) throws ChartDataGenException, Exception {
        String jobId = schedule(something);
        CompletableFuture<String> job = pollForCompletion(jobId);
        CompletableFuture<Object> resulting = job.thenApply(this::download);
        resulting.whenComplete((result, thrown) -> {
            if (resulting.isCancelled()) { //the check is not necessary, but communicates the intent better
                job.cancel(true);
            }
        });
        return resulting;
}
Run Code Online (Sandbox Code Playgroud)

编辑2:

我发现了tascalate-concurrent,一个很棒的库提供了一个理智的实现CompletionStage,支持依赖的promises(通过DependentPromise类),可以透明地反向传播取消.对于这个用例来说似乎很完美.

这应该足够了:

DependentPromise
          .from(pollForCompletion(jobId))
          .thenApply(this::download, true); //true means the cancellation should back-propagate
Run Code Online (Sandbox Code Playgroud)

请注意,没有测试这种方法.

Hol*_*ger 5

您的结构如下:

           ????????????????????
           ? completionFuture |
           ????????????????????
             ?              ?
  ????????????????      ?????????????
  ? whenComplete |      ? thenApply |
  ????????????????      ?????????????
Run Code Online (Sandbox Code Playgroud)

因此,当您取消thenApply未来时,原始completionFuture对象不会受到影响,因为它不依赖于thenApply舞台.但是,如果您没有链接thenApply舞台,则返回原始completionFuture实例并取消此阶段会导致取消所有相关阶段,从而whenComplete立即执行操作.

但是当thenApply阶段被取消时,completionFuture仍然可以在pollRemoteServer(jobId).equals("COMPLETE")条件满足时完成,因为轮询不会停止.但是,我们不知道的关系jobId = schedule(something)pollRemoteServer(jobId).如果您的应用程序状态发生变化,导致在取消下载后永远无法满足此条件,则此未来永远不会完成......


关于你的最后一个问题,哪个未来是"我应该坚持的那个?",事实上,没有要求有一个线性的期货链,而便利的方法是CompletableFuture让这个链更容易创建,通常,这是最不实用的事情,因为如果你有一个线性依赖,你可以写一个代码块.你链接两个独立阶段的模型是正确的,但取消并不适用于它,但它也不能通过线性链.

如果您希望能够取消源阶段,则需要对其进行引用,但如果您希望能够获得依赖阶段的结果,则还需要对该阶段的引用.

  • @kaqqao:你不需要轮询`resul.isCancelled()`因为`thrown`将是`CancellationException`然后.但实际上,你根本不需要任何检查.你可以简单地使用`resul.whenComplete((result,thrown) - > job.cancel(true));`当时,函数执行,或者`result`已被取消或`job`已经被执行完成后,无论如何它都会忽略取消. (2认同)
  • @Marko Topolnik:每次我向`ExecutorService`提交作业或者向事件源注册一个监听器时,我都认为它将保留对该对象的引用以履行合同.嗯,实际上,确实不能确保*对象*仍然存在,但只要执行*关联动作*就没有关系,这就是这就是全部.我们希望执行`BiConsumer`的`accept`方法; `whenComplete`返回的`CompletableFuture`是无关紧要的...... (2认同)