Java 8 使用 CompletableFuture::join 维护流顺序

pre*_*ash 3 java java-8 java-stream completable-future

我有一个异步执行的查询输入流。我想确保当我使用时Completablefuture::join,这些要求的结果是按照输入查询流的顺序收集的。

这是我的代码的样子:

queries.stream()
     .map(query -> CompletableFuture.supplyAsync(() -> {
                    try {
                        return SQLQueryEngine.execute(query);
                    } catch (InternalErrorException e) {
                        throw new RuntimeException(e);
                    }
     }))
     .map(CompletableFuture::join)
     .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

SQLQueryEngine.execute(查询); 返回一个List<Results>so 输出是List<List<Result>。我想展平并将所有结果合并到一个列表中。如果我在收集之前使用 .flatMap(List::stream) 来扁平化,它会保持排序吗?

Mis*_*sha 7

您可能的意思.flatMap是,是的,它将保留顺序。

考虑显式传递一个ExecutortosupplyAsync以避免在ForkJoinPool.commonPool()

正如@Ruben指出的那样,您在提交后立即加入当前线程中的每个任务,并提交下一个查询之前,这可能是一个错误。您应该先提交所有查询,然后才开始加入。

您可以这样做(使用静态导入toList):

queries.stream()
    .map(query -> CompletableFuture.supplyAsync(...))
    .collect(toList())
    .stream()
    .map(CompletableFuture::join)
    .collect(toList());
Run Code Online (Sandbox Code Playgroud)

  • 显然,由于咖啡因摄入量不足,我的讽刺探测器处于离线状态。我认为最好将@Ruben 的评论纳入答案。 (2认同)