返回一个包含 CompletableFuture 列表的 CompletableFuture

pay*_*yne 2 java spring spring-mvc spring-boot completable-future

我正在尝试加快对多个 API 的调用。

下面的代码中,getFilteredEvents是当前的同步版本。我有一种感觉,该map(x -> x.getFilteredEvents(eventResearch))操作将等待每个 API(它RestTemplate.exchange()在内部使用)的响应,然后再传递到下一个 API 以构建List<Event>我想要返回的 API。解决方案可能是map在单独的线程上启动调用,但我想尝试一下CompletableFutureAPI。

因此,这getFilteredEventsFaster是我努力改善响应时间的结果。

@Service
public class EventsResearchService {

    @Autowired
    private List<UniformEventsResearchApi> eventsResearchApis;

    // this works, but I'm trying to improve it
    public EventResearchResponse getFilteredEvents(EventResearch eventResearch) {
        List<Event> eventsList = eventsResearchApis
                .stream()
                .map(x -> x.getFilteredEvents(eventResearch))
                .flatMap(List::stream)
                .collect(Collectors.toList());

        return extractResponse(eventResearch, eventsList);
    }

    // this doesn't work yet: what is wrong?
    public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {
        List<CompletableFuture<List<Event>>> futureEventsList = eventsResearchApis
                .parallelStream()
                .map(x -> CompletableFuture.supplyAsync(() -> x.getFilteredEvents(eventResearch)))
                .collect(Collectors.toList());

        return CompletableFuture.allOf(futureEventsList.toArray(new CompletableFuture<List<Event>>[0]));
    }
}
Run Code Online (Sandbox Code Playgroud)

我的理解是,我想将一个CompletableFuture<List<Event>>返回到我的前端,而不是发送到List<CompletableFuture<List<Event>>>,因此CompletableFuture.allOf()调用(如果我理解正确,它类似于一个flatmap操作,CompletableFuture从多个CompleteableFutures 创建一个)。

不幸的是,事实上,我Generic array creation在使用时遇到编译错误new CompletableFuture<List<Event>>[0]

我究竟做错了什么?

我有一种感觉,使用该join方法确实可以让我收集所有答案,但这将是我的服务线程上的阻塞操作,不是吗?CompletableFuture(如果我理解正确的话,这将违背尝试将 a 返回到我的前端的目的。)

Oli*_*lin 7

以下代码片段显示了如何使用listOfFutures.stream().map(CompletableFuture::join)来收集 的结果allOF我从本页中获取了这个示例,该示例表明它不会等待每个 Future 完成。

class Test {

    public static void main(String[] args) throws Exception {

        long millisBefore = System.currentTimeMillis();

        List<String> strings = Arrays.asList("1","2", "3", "4", "5", "6", "7", "8");
        List<CompletableFuture<String>> listOfFutures = strings.stream().map(Test::downloadWebPage).collect(toList());
        CompletableFuture<List<String>> futureOfList = CompletableFuture
                .allOf(listOfFutures.toArray(new CompletableFuture[0]))
                .thenApply(v ->  listOfFutures.stream().map(CompletableFuture::join).collect(toList()));

        System.out.println(futureOfList.get()); // blocks here
        System.out.printf("time taken : %.4fs\n", (System.currentTimeMillis() - millisBefore)/1000d);
    }

    private static CompletableFuture<String> downloadWebPage(String webPageLink) {
        return CompletableFuture.supplyAsync( () ->{
            try { TimeUnit.SECONDS.sleep(4); }
            catch (Exception io){ throw new RuntimeException(io); }
            finally { return "downloaded : "+ webPageLink; }
            });
    }

}
Run Code Online (Sandbox Code Playgroud)

由于效率似乎是这里的问题,因此我添加了一个虚拟基准来证明执行不需要 32 秒。

输出 :

[downloaded : 1, downloaded : 2, downloaded : 3, downloaded : 4, downloaded : 5, downloaded : 6, downloaded : 7, downloaded : 8]
time taken : 8.0630s
Run Code Online (Sandbox Code Playgroud)

从原始问题海报编辑

感谢这个答案,并通过使用这个网站(讨论与 相关的异常处理allOf),我想出了这个完整的版本:

    public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {

        /* Collecting the list of all the async requests that build a List<Event>. */
        List<CompletableFuture<List<Event>>> completableFutures = eventsResearchApis.stream()
                .map(api -> getFilteredEventsAsync(api, eventResearch))
                .collect(Collectors.toList());

        /* Creating a single Future that contains all the Futures we just created ("flatmap"). */
        CompletableFuture<Void> allFutures =CompletableFuture.allOf(completableFutures
                .toArray(new CompletableFuture[eventsResearchApis.size()]));

        /* When all the Futures have completed, we join them to create merged List<Event>. */
        CompletableFuture<List<Event>> allCompletableFutures = allFutures
                .thenApply(future -> completableFutures.stream()
                            .map(CompletableFuture::join)
                            .flatMap(List::stream) // creating a List<Event> from List<List<Event>>
                            .collect(Collectors.toList())
                );

        return allCompletableFutures;
    }

    private CompletableFuture<List<Event>> getFilteredEventsAsync(UniformEventsResearchApi api,
            EventResearch eventResearch) {
        /* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
        return CompletableFuture.supplyAsync(() -> api.getFilteredEvents(eventResearch))
                .exceptionally(ex -> {
                    LOGGER.error("Extraction of events from API went wrong: ", ex);
                    return Collections.emptyList(); // gets managed in the wrapping Future
                });
    }
Run Code Online (Sandbox Code Playgroud)