CompletableFuture以异步执行多个数据库查询

The*_*nce 5 java parallel-processing multithreading java-8 completable-future

我想并行执行多个数据库查询,并将结果存储在映射中。我正在尝试这样做,但是访问地图时地图没有完全填充。

我做错什么了吗?

 public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {

         Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x));

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x));

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x));

         return instrumentsEdgesMap;

}
Run Code Online (Sandbox Code Playgroud)

任何帮助将不胜感激,在此先感谢。

Dea*_*ool 5

在上面的方法中supplyAsync将由ForkJoinPoolAsync线程执行,但方法总是通过调用线程来执行。因此,您的查询将按顺序一个接一个地运行,而不是异步的thenApply

所有没有显式 Executor 参数的异步方法都使用 ForkJoinPool.commonPool() 执行(除非它不支持至少两个并行级别,在这种情况下,会创建一个新线程来运行每个任务)。

这是例子

CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getName());
        return "SupplyAsync";
    }).thenAccept(i->{
    System.out.println(Thread.currentThread().getName()+"--"+i);
    });
Run Code Online (Sandbox Code Playgroud)

输出 :

ForkJoinPool.commonPool-worker-3
main--SupplyAsync
Run Code Online (Sandbox Code Playgroud)

因此,如果您希望您的流程成为Async那么首先触发所有三个数据库查询supplyAsync并捕获其中的输出CompletableFuture

CompletableFuture<Set<String>> first =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp));

CompletableFuture<Set<String>> second =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp));

CompletableFuture<Set<String>> third =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp));
Run Code Online (Sandbox Code Playgroud)

然后现在用其中三个创建一个流,然后将它们收集到 Map

Stream.of(new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.ABC, first),
              new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.XYZ, second),
              new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.DEF, third))
       .forEach(entry->{
           entry.getValue().thenAccept(val-> instrumentsEdgesMap.put(entry.getKey(), val));
       });
Run Code Online (Sandbox Code Playgroud)