CompletableFuture supplyAsync 与 Stream.map()

Ale*_*lex 3 concurrency

伙计们!我有一个问题:这段代码有什么作用:

Collection<Contract.class> contracts = fillTheCollectionFromDb();
contracts.stream().filter(condition)
                  .map(contractItem ->
                       CompletableFuture.supplyAsync(() -> 
                           {T result = getAnotherDataFromDb(contractItem); 
                            return result;}, Executor.class)
                  )//end .map
                  .map(CompletableFuture::join).collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

Ale*_*dov 6

此代码等效于:

Collection<Contract> contracts = fillTheCollectionFromDb();
contracts.stream().filter(condition)
              .map(this::getAnotherDataFromDb)
              .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

为了使这个程序真正并行,它应该被修改。首先,并行启动对数据库的所有请求:

Collection<Contract> contracts = fillTheCollectionFromDb();
List<CompletableFuture> futures = contracts.stream().filter(condition)
              .map(contractItem ->
                   CompletableFuture.supplyAsync(
                         ()->getAnotherDataFromDb(contractItem),
                       executor)
              )//end .map
              .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

然后才收集所有结果:

List results = futures.stream
      .map(CompletableFuture::join)
      .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)