如何在不阻塞的情况下将 CompletableFuture<Stream<T>> 转换为 Stream<T>

rod*_*ino 6 lambda asynchronous java-8 java-stream completable-future

我正在使用Async Http Client 库(使用 Netty)向 RESTful API 发出异步 Http Get 请求。由于我想保留非阻塞行为,因此我将CompletableFuture<T>作为 Http Get 请求的结果返回的实例。因此,在 RESTful API 端点返回一个 Json 数组的地方,我返回一个CompletableFuture<T[]>.

然而,根据 Erik Meijer 对编程中的四种基本效果所做的分类,我认为这Stream<T>更适合于发出异步 Http Get 请求并返回 Json 数组的 Java 方法的结果。在这种情况下,我们可以将Stream<T>视为Observable<T>等价物,它是返回许多值的异步计算结果

所以,考虑到这resp持有响应,那么我可以得到CompletableFuture<Stream<T>>如下:

 CompletableFuture<T[]> resp = …
 return resp.thenApply(Arrays::stream);
Run Code Online (Sandbox Code Playgroud)

但是,我想知道如何将 the 转换CompletableFuture<Stream<T>> resp为 a Stream<T>,而无需等待计算完成(即我不想在get()调用时阻塞)?

我希望得到与以下表达式相同的结果,但不阻塞get()

return resp.thenApply(Arrays::stream).get();
Run Code Online (Sandbox Code Playgroud)

Mig*_*boa 6

您可以构建一个Stream<T>将调用推迟到Future<T> get()方法,就像这样:

CompletableFuture<T[]> resp = ...
return Stream
        .of(resp)                               // Stream<CompletableFuture<T[]>>
        .flatMap(f -> Arrays.stream(f.join())); // Stream<T>
Run Code Online (Sandbox Code Playgroud)

为了简化使用,而不是get()join()用来避免检查异常。

  • 当然。但只有在穿越时。在 OP 中,作者也没有遍历 `[]T`。 (3认同)
  • 这并没有改变一个事实,流操作将被阻塞,直到 `CompletableFuture` 整个完成。它只是延迟了一点——代价是高昂的。 (2认同)
  • 的确。我只想强调这些限制(因为我不确定 OP 是否知道它们)。我添加了一个更详细的答案。 (2认同)