Get*_*ere 2 java asynchronous java-8 completable-future
我正在下面运行一些异步任务,需要等待它们全部完成。我不知道为什么,但它join()并没有强制等待所有任务,并且代码会继续执行而无需等待。连接流未按预期工作是否有原因?
CompletableFutures列表只是一个映射supplyAsync的流
List<Integer> items = Arrays.asList(1, 2, 3);
List<CompletableFuture<Integer>> futures = items
.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
System.out.println("processing");
// do some processing here
return item;
}))
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
我等待期货之类的。
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)
我可以等待,futures.forEach(CompletableFuture::join);但我想知道为什么我的流方法不起作用。
这段代码:
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)
不等待所有 futurefutures完成。它所做的是创建一个新的 future,它将等待所有异步执行futures完成后再完成它本身(但在所有这些 future 完成之前不会阻塞)。当allOf future 完成时,您的thenApply代码就会运行。但allOf()会立即返回,不会阻塞。
这意味着futures.stream().map(CompletableFuture::join).collect(Collectors.toList())您的代码仅在所有异步执行完成后运行,这违背了您的目的。所有呼叫join()都会立即返回。但这还不是更大的问题。您的挑战是allOf().thenApply()不会等待异步执行完成。它只会创造另一个不会阻碍的未来。
最简单的解决方案是使用第一个管道并映射到整数列表:
List<Integer> results = items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
System.out.println("processing " + item);
// do some processing here
return item;
}))
.collect(Collectors.toList()) //force-submit all
.stream()
.map(CompletableFuture::join) //wait for each
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
如果您想使用类似于原始代码的内容,那么您的第二个代码段必须更改为:
List<Integer> reuslts = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
这是因为它CompletableFuture.allOf不会等待,它只是将所有 future 组合成一个新的 future,并在全部完成时完成:
返回一个新的 CompletableFuture,当所有给定的 CompletableFuture 完成时,该 CompletableFuture 也完成。
或者,您仍然可以使用allOf()with join(),然后运行当前thenApply()代码:
//wrapper future completes when all futures have completed
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.join();
//join() calls below return immediately
List<Integer> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
join()最后一条语句中的调用立即返回,因为对join()包装器 ( allOf()) future 的调用将等待传递给它的所有 future 完成。这就是为什么当您可以使用第一种方法时我不认为这样做的原因。
| 归档时间: |
|
| 查看次数: |
8604 次 |
| 最近记录: |