缩减列表<CompletableFuture<T>>

Jih*_* No 5 java java-stream completable-future

何时ints给出:

List<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

使用Java Stream API,我们可以减少它们

MyValue myvalue = ints
        .parallelStream()
        .map(x -> toMyValue(x))
        .reduce((t, t2) -> t.combine(t2))
        .get();
Run Code Online (Sandbox Code Playgroud)

在这个例子中,对我来说重要的是......

  • 项目将在多个线程中减少
  • 早期映射的项目将提前减少
  • 并非所有结果toMyValue()都会同时加载

现在我想通过API做同样的处理CompletableFuture

为了做地图,我做了:

List<CompeletableFuture<MyValue>> myValueFutures = ints
        .stream()
        .map(x -> CompletableFuture.supplyAsync(() -> toMyValue(x), MY_THREAD_POOL))
        .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

现在我不知道如何减少List<CompeletableFuture<MyValue>> myValueFutures单身MyValue

并行流提供了方便的 API,但由于这些问题我不想使用 Stream API:

  • 并行流在处理过程中很难停止阶段。
  • 当某些worker被IO阻塞时,并行流的活动worker计数可能会超过并行度。这有助于最大限度地提高 CPU 利用率,但可能会出现内存开销(甚至 OOM)。

有什么办法可以减少 CompetableFutures 吗?没有流减少API的一一?

jcc*_*ero 3

基本上你需要等待所有CompletableFutures 的结果,然后组合以获得所需的结果。

有多种方法可以实现此目的,但该类提供了可用于该目的的CompletableFuture方法。allOf

当我必须处理类似的问题时,我喜欢遵循 ​​Tomasz Nurkiewicz 的建议并按以下方式执行此类计算。

正如文章中所建议的,首先,让我们定义以下方便的方法:以可变allOf参数形式接收参数,并且不返回聚合结果的未来;此方法将允许您克服这些缺点,因此您可以传递 aCollection作为参数并返回List实际结果而不是Void.

private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
    CompletableFuture<Void> allDoneFuture =
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    return allDoneFuture.thenApply(v ->
            futures.stream().
                    map(future -> future.join()).
                    collect(Collectors.<T>toList())
    );
}
Run Code Online (Sandbox Code Playgroud)

通过这种方便的方法,您可以通过以下方法减少您的值:

final CompletableFuture<List<MyValue>> allDone = sequence(myValueFutures);

// Please, see also for alternate approaches
// https://stackoverflow.com/questions/43489281/return-value-directly-from-completablefuture-thenaccept
final List<MyValue> myValues = allDone.join();

final Optional<MyValue> optResult = myValues.stream().
  reduce((t, t2) -> t.combine(t2))
;

// Process the returned value as you consider appropriate
final MyValue result = optResult.get();
Run Code Online (Sandbox Code Playgroud)