在java中,我如何处理CompletableFutures并获得第一个完成的理想结果?

Hap*_*eer 5 java concurrency java-8 completable-future

通常使用CompletableFuture,我会在结果可用时立即调用thenApply或其他方法来执行某些操作.但是,我现在有一种情况,我希望处理结果,直到我收到一个肯定的结果,然后忽略所有进一步的结果.

如果我只想获得第一个可用的结果,我可以使用CompletableFuture.anyOf(虽然我讨厌将列表转换为数组只是为了调用anyOf).但这不是我想要的.我想取第一个结果,如果它没有理想的结果,那么我想处理第二个可用结果,依此类推,直到得到理想的结果.

这是一个简单的例子,它遍历所有结果并返回它找到的大于9的第一个值.(注意,这不是我真正的任务.这只是一个简单的例子.)

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    for(CompletableFuture<Integer> result : results) {
        Integer v = result.get();
        if(v > 9)
            return v;
    }
    return null;
}
Run Code Online (Sandbox Code Playgroud)

当然,这个例子从一开始就经历了结果,而不是在完成时查看结果.所以这里有一个可以实现我想要的东西,但代码要复杂得多.

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    AtomicInteger finalResult = new AtomicInteger();
    CountDownLatch latch = new CountDownLatch(results.size());
    for(CompletableFuture<Integer> result : results) {
        result.whenComplete((v,e) -> {
            if(e!=null) {
                Logger.getLogger(getClass()).error("",e);
            } else if(v > 9) {
                finalResult.set(v);
                while(latch.getCount() > 0)
                    latch.countDown();
                return;
            }
            latch.countDown();
        });
    }
    latch.await();

    if(finalResult.get() > 9)
        return finalResult.get();
    return null;
}    
Run Code Online (Sandbox Code Playgroud)

有没有api,我可以这样做?

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    Iterator<Integer> resultIt = getResultsAsAvailable(results);
    for(; resultIt.hasNext();) {
        Integer v = resultIt.next();
        if(v > 9)
            return v;
    }
    return null;
}
Run Code Online (Sandbox Code Playgroud)

甚至更好:

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    return getFirstMatch(results, r -> {return r > 9;});
}
Run Code Online (Sandbox Code Playgroud)

Sav*_*ior 5

我不知道 JDK 或其他地方有任何这样的 API。你可以自己动手。

如果 future 已经完成,您可以利用CompletableFuture#complete(和) 不执行任何操作的事实。completeExceptionally

如果尚未完成,则将get()相关方法返回的值设置为给定值。

创建新的最终结果 CompletableFuture。如果您的条件适用,请向您的每个尝试添加complete最终结果的延续。那个未来将随着第一次的成功而结束。但是,如果没有成功,您显然需要null一个结果。您可以创建一个CompletableFuturewithallOf来尝试使用 来获得complete最终结果null

就像是

public static <T> CompletableFuture<T> firstOrNull(List<CompletableFuture<T>> futures, Predicate<T> condition) {
    CompletableFuture<T> finalResult = new CompletableFuture<>();
    // attempt to complete on success
    futures.stream().forEach(future -> future.thenAccept(successResult -> {
        if (condition.test(successResult))
            finalResult.complete(successResult);
    }));
    CompletableFuture<?> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    all.thenRun(() -> {
        finalResult.complete(null);
    });
    return finalResult;
}
Run Code Online (Sandbox Code Playgroud)

您需要支付无操作调用的开销。

您可以null根据需要更改为某些默认值或以不同的方式处理异常(completeExceptionally一旦发生错误)。您必须使用whenCompletehandle 代替thenAccept上面的内容才能访问Exception.


Hol*_*ger 5

您可以使用以下解决方案:

\n\n
public static <T> CompletableFuture<T> anyMatch(\n    List<? extends CompletionStage<? extends T>> l, Predicate<? super T> criteria) {\n\n    CompletableFuture<T> result=new CompletableFuture<>();\n    Consumer<T> whenMatching=v -> { if(criteria.test(v)) result.complete(v); };\n    CompletableFuture.allOf(l.stream()\n        .map(f -> f.thenAccept(whenMatching)).toArray(CompletableFuture<?>[]::new))\n    .whenComplete((ignored, t) ->\n        result.completeExceptionally(t!=null? t: new NoSuchElementException()));\n    return result;\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

基本原理与Pillar\xe2\x80\x99s 答案中的相同,但是存在一些差异:

\n\n
    \n
  • 通用签名更加灵活。
  • \n
  • 所需数组的创建CompletableFuture.allOf与源期货后续操作的注册相结合。作为副作用,操作的处理程序allOf依赖于完成结果的所有尝试的完成,而不仅仅是原始的 future。这使得实际所需的依赖关系变得明确。这样,当我们将所有thenAccepts 替换为thenAcceptAsyncs 时,它甚至可以工作。
  • \n
  • 该解决方案以 a 完成,NoSuchElementException而不是null在没有结果满足条件的情况下返回。如果至少一个 future 异常完成,并且没有成功完成且结果匹配,则中继发生的异常之一。
  • \n
\n\n

你可以尝试一下

\n\n
List<CompletableFuture<Integer>> list=Arrays.asList(\n    CompletableFuture.supplyAsync(()->5),\n    CompletableFuture.supplyAsync(()->{throw new RuntimeException(); }),\n    CompletableFuture.supplyAsync(()->42),\n    CompletableFuture.completedFuture(0)\n);\nanyMatch(list, i -> i>9)\n    .thenAccept(i->System.out.println("got "+i))\n    // optionally chain with:\n    .whenComplete((x,t)->{ if(t!=null) t.printStackTrace(); });\n
Run Code Online (Sandbox Code Playgroud)\n