Java从多次调用中收集CompletableFuture的结果

nib*_*bsa 7 java multithreading completable-future

我必须运行多个外部调用操作,然后以列表的形式获取结果。我决定使用CompletableFutureapi,我准备的代码很恶心:

这个例子:

public class Main {
    public static void main(String[] args) {
        String prefix = "collection_";

        List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
                .boxed()
                .map(num -> prefix.concat("" + num))
                .map(name -> CompletableFuture.supplyAsync(
                        () -> callApi(name)))
                .collect(Collectors.toList());

        try {
            CompletableFuture.allOf(usersResult.toArray(new CompletableFuture[usersResult.size()])).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        List<User> users = usersResult //the result I need
                .stream()
                .map(userCompletableFuture -> {
                    try {
                        return userCompletableFuture.get();
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                    return null;
                })
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }

    private static User callApi(String collection) {
        return new User(); //potentially time-consuming operation
    }
}
Run Code Online (Sandbox Code Playgroud)

我有以下问题:

  1. 我能否以某种方式避免复制try-catch流中的块,我将 CompletableFuture 映射到用户?
  2. 这段代码可以不那么连续吗(我怎样才能避免等待所有期货完成?)
  3. 这样做可以吗(所有的期货都会在流中解决吗?):

    public class Main {
        public static void main(String[] args) {
            String prefix = "collection_";
    
            List<User> usersResult = IntStream.range(1, 10)
                    .boxed()
                    .map(num -> prefix.concat("" + num))
                    .map(name -> CompletableFuture.supplyAsync(
                            () -> callApi(name)))
                    .filter(Objects::nonNull)
                    .map(userCompletableFuture -> {
                        try {
                            return userCompletableFuture.get();
                        } catch (InterruptedException | ExecutionException e) {
                            e.printStackTrace();
                        }
                        return null;
                    })
                    .collect(Collectors.toList());
        }
    
        private static User callApi(String collection) {
            return new User(); //potentially time-consuming operation
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)

Did*_*r L 8

对于 1.,您可以完全跳过allOf().get()电话,因为您无论如何都要一一等待所有期货。¹

对于 2.,您可以try-catch通过执行以下操作来简化:

  • 用于exceptionally()将来直接处理异常;
  • 使用join()而不是get()避免检查异常(并且您知道不可能有异常)。

对于 3.,您不能真正减少顺序,因为您至少需要执行以下步骤:创建所有期货,然后处理它们的结果。

如果你在一个流中做所有事情,它会创建每个未来,然后在创建下一个之前立即等待它——所以你会失去并行性。您可以改用并行流,但使用CompletableFutures不会有太大好处。

所以最终的代码是:

List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
        .boxed()
        .map(num -> prefix.concat("" + num))
        .map(name -> CompletableFuture.supplyAsync(() -> callApi(name))
            .exceptionally(e -> {
                e.printStackTrace();
                return null;
            }))
        .collect(Collectors.toList());

List<User> users = usersResult
        .stream()
        .map(CompletableFuture::join)
        .filter(Objects::nonNull)
        .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

¹请注意,allOf()如果您希望结果也为 a CompletableFuture<List<User>>,则仍然需要调用,例如

final CompletableFuture<List<User>> result =
        CompletableFuture.allOf(usersResult.stream().toArray(CompletableFuture[]::new))
                .thenApply(__ -> usersResult
                        .stream()
                        .map(CompletableFuture::join)
                        .filter(Objects::nonNull)
                        .collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)