Hap*_*eer 5 java concurrency java-8 completable-future
通常使用CompletableFuture,我会在结果可用时立即调用thenApply或其他方法来执行某些操作.但是,我现在有一种情况,我希望处理结果,直到我收到一个肯定的结果,然后忽略所有进一步的结果.
如果我只想获得第一个可用的结果,我可以使用CompletableFuture.anyOf(虽然我讨厌将列表转换为数组只是为了调用anyOf).但这不是我想要的.我想取第一个结果,如果它没有理想的结果,那么我想处理第二个可用结果,依此类推,直到得到理想的结果.
这是一个简单的例子,它遍历所有结果并返回它找到的大于9的第一个值.(注意,这不是我真正的任务.这只是一个简单的例子.)
public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
for(CompletableFuture<Integer> result : results) {
Integer v = result.get();
if(v > 9)
return v;
}
return null;
}
Run Code Online (Sandbox Code Playgroud)
当然,这个例子从一开始就经历了结果,而不是在完成时查看结果.所以这里有一个可以实现我想要的东西,但代码要复杂得多.
public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
AtomicInteger finalResult = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(results.size());
for(CompletableFuture<Integer> result : results) {
result.whenComplete((v,e) -> {
if(e!=null) {
Logger.getLogger(getClass()).error("",e);
} else if(v > 9) {
finalResult.set(v);
while(latch.getCount() > 0)
latch.countDown();
return;
}
latch.countDown();
});
}
latch.await();
if(finalResult.get() > 9)
return finalResult.get();
return null;
}
Run Code Online (Sandbox Code Playgroud)
有没有api,我可以这样做?
public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
Iterator<Integer> resultIt = getResultsAsAvailable(results);
for(; resultIt.hasNext();) {
Integer v = resultIt.next();
if(v > 9)
return v;
}
return null;
}
Run Code Online (Sandbox Code Playgroud)
甚至更好:
public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
return getFirstMatch(results, r -> {return r > 9;});
}
Run Code Online (Sandbox Code Playgroud)
我不知道 JDK 或其他地方有任何这样的 API。你可以自己动手。
如果 future 已经完成,您可以利用CompletableFuture#complete(和) 不执行任何操作的事实。completeExceptionally
如果尚未完成,则将
get()相关方法返回的值设置为给定值。
创建新的最终结果 CompletableFuture。如果您的条件适用,请向您的每个尝试添加complete最终结果的延续。那个未来将随着第一次的成功而结束。但是,如果没有成功,您显然需要null一个结果。您可以创建一个CompletableFuturewithallOf来尝试使用 来获得complete最终结果null。
就像是
public static <T> CompletableFuture<T> firstOrNull(List<CompletableFuture<T>> futures, Predicate<T> condition) {
CompletableFuture<T> finalResult = new CompletableFuture<>();
// attempt to complete on success
futures.stream().forEach(future -> future.thenAccept(successResult -> {
if (condition.test(successResult))
finalResult.complete(successResult);
}));
CompletableFuture<?> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
all.thenRun(() -> {
finalResult.complete(null);
});
return finalResult;
}
Run Code Online (Sandbox Code Playgroud)
您需要支付无操作调用的开销。
您可以null根据需要更改为某些默认值或以不同的方式处理异常(completeExceptionally一旦发生错误)。您必须使用whenComplete或handle 代替thenAccept上面的内容才能访问Exception.
您可以使用以下解决方案:
\n\npublic static <T> CompletableFuture<T> anyMatch(\n List<? extends CompletionStage<? extends T>> l, Predicate<? super T> criteria) {\n\n CompletableFuture<T> result=new CompletableFuture<>();\n Consumer<T> whenMatching=v -> { if(criteria.test(v)) result.complete(v); };\n CompletableFuture.allOf(l.stream()\n .map(f -> f.thenAccept(whenMatching)).toArray(CompletableFuture<?>[]::new))\n .whenComplete((ignored, t) ->\n result.completeExceptionally(t!=null? t: new NoSuchElementException()));\n return result;\n}\nRun Code Online (Sandbox Code Playgroud)\n\n基本原理与Pillar\xe2\x80\x99s 答案中的相同,但是存在一些差异:
\n\nCompletableFuture.allOf与源期货后续操作的注册相结合。作为副作用,操作的处理程序allOf依赖于完成结果的所有尝试的完成,而不仅仅是原始的 future。这使得实际所需的依赖关系变得明确。这样,当我们将所有thenAccepts 替换为thenAcceptAsyncs 时,它甚至可以工作。NoSuchElementException而不是null在没有结果满足条件的情况下返回。如果至少一个 future 异常完成,并且没有成功完成且结果匹配,则中继发生的异常之一。你可以尝试一下
\n\nList<CompletableFuture<Integer>> list=Arrays.asList(\n CompletableFuture.supplyAsync(()->5),\n CompletableFuture.supplyAsync(()->{throw new RuntimeException(); }),\n CompletableFuture.supplyAsync(()->42),\n CompletableFuture.completedFuture(0)\n);\nanyMatch(list, i -> i>9)\n .thenAccept(i->System.out.println("got "+i))\n // optionally chain with:\n .whenComplete((x,t)->{ if(t!=null) t.printStackTrace(); });\nRun Code Online (Sandbox Code Playgroud)\n