Mig*_*boa 17 java java-8 rx-java project-reactor completable-future
允许对结果流进行多次迭代,CompletableFuture<Stream<String>>我正在考虑以下方法之一:
将生成的未来转换为CompletableFuture<List<String>>:teams.thenApply(st -> st.collect(toList()))
将生成的未来转换为Flux<String>缓存:Flux.fromStream(teams::join).cache();
Flux<T>是Publisher<T>项目反应堆的实施.
使用案例:
我想Stream<String>从一个数据源获得一个具有顶级联赛球队名称的序列(例如),该数据源提供一个League对象Standing[](基于足球数据RESTful API,例如http://api.football-data.org/v1/ soccerseasons/445/leagueTable).使用AsyncHttpClient和Gson我们有:
CompletableFuture<Stream<String>> teams = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
.execute()
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenApply(body -> gson.fromJson(body, League.class));
.thenApply(l -> stream(l.standings).map(s -> s.teamName));
Run Code Online (Sandbox Code Playgroud)
要重新使用生成的流,我有两个选择:
1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))
2. Flux<String> res = Flux.fromStream(teams::join).cache()
Run Code Online (Sandbox Code Playgroud)
Flux<T>不那么冗长,并提供我所需要的一切.然而,在这种情况下使用它是否正确?
或者我应该使用CompletableFuture<List<String>>?或者还有其他更好的选择吗?
更新了一些想法(2018-03-16):
CompletableFuture<List<String>>:
List<String>将继续收集,当我们需要继续处理未来的结果时,可能已经完成了.List<T>.Flux<String>:
.cache()并转发到下一层,这可以利用反应API,例如网络通量反应控制器,例如@GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…}Flux<T>我们必须将它包装在cacheable Flux<T>(….cache())中,这反过来会增加第一次遍历的开销,因为它必须将结果项存储在内部缓存中. CompletableFuture<Stream<String>> teams = ...;
Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));
Run Code Online (Sandbox Code Playgroud)
编辑:
Flux.fromStream(teams::join) 是一个代码味道,因为它持有一个线程来从CompletableFuture获取结果,该结果在另一个线程上运行.
| 归档时间: |
|
| 查看次数: |
2030 次 |
| 最近记录: |