将CompletableFuture <Stream <T >>转换为Publisher <T>是否正确?

Mig*_*boa 17 java java-8 rx-java project-reactor completable-future

允许对结果流进行多次迭代,CompletableFuture<Stream<String>>我正在考虑以下方法之一:

  1. 将生成的未来转换为CompletableFuture<List<String>>:teams.thenApply(st -> st.collect(toList()))

  2. 将生成的未来转换为Flux<String>缓存:Flux.fromStream(teams::join).cache();

Flux<T>Publisher<T>项目反应堆的实施.

使用案例:

我想Stream<String>从一个数据源获得一个具有顶级联赛球队名称的序列(例如),该数据源提供一个League对象Standing[](基于足球数据RESTful API,例如http://api.football-data.org/v1/ soccerseasons/445/leagueTable).使用AsyncHttpClientGson我们有:

CompletableFuture<Stream<String>> teams = asyncHttpClient
    .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
    .execute()
    .toCompletableFuture()
    .thenApply(Response::getResponseBody)
    .thenApply(body -> gson.fromJson(body, League.class));
    .thenApply(l -> stream(l.standings).map(s -> s.teamName));
Run Code Online (Sandbox Code Playgroud)

要重新使用生成的流,我有两个选择:

1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))

2. Flux<String> res = Flux.fromStream(teams::join).cache()
Run Code Online (Sandbox Code Playgroud)

Flux<T>不那么冗长,并提供我所需要的一切.然而,在这种情况下使用它是否正确?

或者我应该使用CompletableFuture<List<String>>?或者还有其他更好的选择吗?

更新了一些想法(2018-03-16):

CompletableFuture<List<String>>:

  • [PROS] List<String>将继续收集,当我们需要继续处理未来的结果时,可能已经完成了.
  • [CONS]宣言冗长.
  • [CONS]如果我们只想使用它一次,那么我们就不需要收集那些物品了List<T>.

Flux<String>:

  • [PROS]宣言简明扼要
  • [PROS]如果我们只想使用它一次,那么我们可以省略.cache()并转发到下一层,这可以利用反应API,例如网络通量反应控制器,例如@GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…}
  • [CONS]如果我们想重用它,Flux<T>我们必须将它包装在cacheable Flux<T>(….cache())中,这反过来会增加第一次遍历的开销,因为它必须将结果项存储在内部缓存中.

you*_*ans 7

    CompletableFuture<Stream<String>> teams = ...;
    Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));
Run Code Online (Sandbox Code Playgroud) 编辑:

Flux.fromStream(teams::join) 是一个代码味道,因为它持有一个线程来从CompletableFuture获取结果,该结果在另一个线程上运行.