如何扇出链式CompletableFuture?

use*_*879 4 parallel-processing java-8 completable-future

我想链接一个CompletableFuture,以便它在处理过程中呈扇形散开。我的意思是我对列表有一个开放的CompletableFuture,并且我想对列表中的每个项目应用计算。

第一步是调用m_myApi.getResponse(request,executor)发出异步调用。

该异步调用的结果具有getCandidates方法。我想同时解析所有这些候选人。

目前,我的代码按顺序解析它们

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .map(MyParser::ParseCandidates)
                                                   .collect(Collectors.toList()));
}
Run Code Online (Sandbox Code Playgroud)

我想要这样的东西:

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .PARSE_IN_PARALLEL_USING_EXECUTOR
}
Run Code Online (Sandbox Code Playgroud)

Hol*_*ger 5

就像在这个答案中说的那样,如果Executor碰巧是一个Fork / Join池,则存在(未记录的)功能,即在其工作线程之一中开始并行流将使用该执行程序执行并行操作。

当您想支持任意Executor实现时,事情就更加复杂了。一种解决方案看起来像

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(
       @Nonnull final REQUEST request, @Nonnull final Executor executor)
{
    CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
    return candidates.thenComposeAsync(
        response -> {
            List<CompletableFuture<DOMAIN_OBJECT>> list = response.getCandidates()
                .stream()
                .map(CompletableFuture::completedFuture)
                .map(f -> f.thenApplyAsync(MyParser::ParseCandidates, executor))
                .collect(Collectors.toList());
            return CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
                .thenApplyAsync(x ->
                    list.stream().map(CompletableFuture::join).collect(Collectors.toList()),
                    executor);
        },
        executor);
}
Run Code Online (Sandbox Code Playgroud)

首先至关重要的是,我们必须在开始等待任何异步作业之前提交所有可能的异步作业,以实现执行程序可能支持的最大并行度。因此,我们必须首先收集所有期货List

在第二步中,我们可以遍历列表和join所有期货。如果执行程序是Fork / Join池,并且将来还没有完成,它将检测到该错误并启动补偿线程以重新获得配置的并行度。但是,对于任意执行者,我们不能假定这样的功能。最值得注意的是,如果执行程序是单线程执行程序,则可能导致死锁。

因此,CompletableFuture.allOf仅当所有期货都已完成时,该解决方案才用于执行迭代和加入所有期货的操作。因此,此解决方案永远不会阻塞执行者的线程,使其与任何Executor实现兼容。