nib*_*bsa 7 java multithreading completable-future
我必须运行多个外部调用操作,然后以列表的形式获取结果。我决定使用CompletableFutureapi,我准备的代码很恶心:
这个例子:
public class Main {
public static void main(String[] args) {
String prefix = "collection_";
List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(
() -> callApi(name)))
.collect(Collectors.toList());
try {
CompletableFuture.allOf(usersResult.toArray(new CompletableFuture[usersResult.size()])).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
List<User> users = usersResult //the result I need
.stream()
.map(userCompletableFuture -> {
try {
return userCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static User callApi(String collection) {
return new User(); //potentially time-consuming operation
}
}
Run Code Online (Sandbox Code Playgroud)
我有以下问题:
try-catch流中的块,我将 CompletableFuture 映射到用户?这样做可以吗(所有的期货都会在流中解决吗?):
public class Main {
public static void main(String[] args) {
String prefix = "collection_";
List<User> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(
() -> callApi(name)))
.filter(Objects::nonNull)
.map(userCompletableFuture -> {
try {
return userCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
})
.collect(Collectors.toList());
}
private static User callApi(String collection) {
return new User(); //potentially time-consuming operation
}
}
Run Code Online (Sandbox Code Playgroud)对于 1.,您可以完全跳过allOf().get()电话,因为您无论如何都要一一等待所有期货。¹
对于 2.,您可以try-catch通过执行以下操作来简化:
exceptionally()将来直接处理异常;join()而不是get()避免检查异常(并且您知道不可能有异常)。对于 3.,您不能真正减少顺序,因为您至少需要执行以下步骤:创建所有期货,然后处理它们的结果。
如果你在一个流中做所有事情,它会创建每个未来,然后在创建下一个之前立即等待它——所以你会失去并行性。您可以改用并行流,但使用CompletableFutures不会有太大好处。
所以最终的代码是:
List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(() -> callApi(name))
.exceptionally(e -> {
e.printStackTrace();
return null;
}))
.collect(Collectors.toList());
List<User> users = usersResult
.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
¹请注意,allOf()如果您希望结果也为 a CompletableFuture<List<User>>,则仍然需要调用,例如
final CompletableFuture<List<User>> result =
CompletableFuture.allOf(usersResult.stream().toArray(CompletableFuture[]::new))
.thenApply(__ -> usersResult
.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)