SupplyAsync 等待所有 CompletableFutures 完成

Get*_*ere 2 java asynchronous java-8 completable-future

我正在下面运行一些异步任务,需要等待它们全部完成。我不知道为什么,但它join()并没有强制等待所有任务,并且代码会继续执行而无需等待。连接流未按预期工作是否有原因?

CompletableFutures列表只是一个映射supplyAsync的流

List<Integer> items = Arrays.asList(1, 2, 3);

List<CompletableFuture<Integer>> futures = items
                .stream()
                .map(item -> CompletableFuture.supplyAsync(() ->  {

                    System.out.println("processing");
                    // do some processing here
                    return item;

                }))
                .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

我等待期货之类的。

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)

我可以等待,futures.forEach(CompletableFuture::join);但我想知道为什么我的流方法不起作用。

ern*_*t_k 5

这段代码:

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)

等待所有 futurefutures完成。它所做的是创建一个新的 future,它将等待所有异步执行futures完成后再完成它本身(但在所有这些 future 完成之前不会阻塞)。当allOf future 完成时,您的thenApply代码就会运行。但allOf()会立即返回,不会阻塞。

这意味着futures.stream().map(CompletableFuture::join).collect(Collectors.toList())您的代码仅在所有异步执行完成后运行,这违背了您的目的。所有呼叫join()都会立即返回。但这还不是更大的问题。您的挑战是allOf().thenApply()不会等待异步执行完成。它只会创造另一个不会阻碍的未来。

最简单的解决方案是使用第一个管道并映射到整数列表:

List<Integer> results = items.stream()
    .map(item -> CompletableFuture.supplyAsync(() -> {

        System.out.println("processing " + item);
        // do some processing here
        return item;

    }))
    .collect(Collectors.toList()) //force-submit all
    .stream()
    .map(CompletableFuture::join) //wait for each
    .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

如果您想使用类似于原始代码的内容,那么您的第二个代码段必须更改为:

List<Integer> reuslts = futures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

这是因为它CompletableFuture.allOf不会等待,它只是将所有 future 组合成一个新的 future,并在全部完成时完成:

返回一个新的 CompletableFuture,当所有给定的 CompletableFuture 完成时,该 CompletableFuture 也完成。

或者,您仍然可以使用allOf()with join(),然后运行当前thenApply()代码:

//wrapper future completes when all futures have completed
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
        .join(); 

//join() calls below return immediately
List<Integer> result = futures.stream()
        .map(CompletableFuture::join) 
        .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

join()最后一条语句中的调用立即返回,因为对join()包装器 ( allOf()) future 的调用将等待传递给它的所有 future 完成。这就是为什么当您可以使用第一种方法时我不认为这样做的原因。