如何使用ExecutorService进行轮询直到结果到达

kaq*_*qao 6 java concurrency future executorservice completable-future

我有一种情况,我必须轮询远程服务器以检查任务是否已完成。完成后,我将进行另一个调用以检索结果。

我最初认为我应该使用SingleThreadScheduledExecutorwith scheduleWithFixedDelay进行轮询:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId);
   }
}
Run Code Online (Sandbox Code Playgroud)

但是,因为我只能提供RunnablescheduleWithFixedDelay不能返回任何东西,我不明白时,future就完成了,如果有的话。打电话future.get()甚至意味着什么?我在等什么结果?

第一次检测到远程任务完成时,我想执行一个不同的远程调用并将其结果设置为的值future。我认为我可以为此使用CompletableFuture,以便将其转发给我的poll方法,然后将其转发给retrieveTask最终将完成它的方法:

CompletableFuture<Object> result = new CompletableFuture<Object>();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId, CompletableFuture<Object> result) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId, result);
   }
}

public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
    Object remoteResult = remoteServer.getJobResult(jobId);
    result.complete(remoteResult);
}
Run Code Online (Sandbox Code Playgroud)

但这有很多问题。首先,CompletableFuture似乎甚至都不打算用于这种用途。相反,我应该按照CompletableFuture.supplyAsync(() -> poll(jobId))我的想法去做,但是当我被取消/完成时,我该如何正确关闭executor并取消future返回的返回值CompletableFuture呢?感觉轮询应该以完全不同的方式实施。

And*_*ert 10

我认为 CompletableFutures 是一个很好的方法来做到这一点:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private void run() {
    final Object jobResult = pollForCompletion("jobId1")
            .thenApply(jobId -> remoteServer.getJobResult(jobId))
            .get();

}

private CompletableFuture<String> pollForCompletion(String jobId) {
    CompletableFuture<String> completionFuture = new CompletableFuture<>();
    final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
        if (remoteServer.isJobDone(jobId)) {
            completionFuture.complete(jobId);
        }
    }, 0, 10, TimeUnit.SECONDS);
    completionFuture.whenComplete((result, thrown) -> {
        checkFuture.cancel(true);
    });
    return completionFuture;
}
Run Code Online (Sandbox Code Playgroud)