Ton*_*kuk 6 reactive-programming java-8 java-stream project-reactor reactive
我有一个 Flux,我想将其转换为 List。我怎样才能做到这一点?
Flux<Object> getInstances(String serviceId); // Current one
List<Object> getInstances(String serviceId); // Demanded one
Run Code Online (Sandbox Code Playgroud)
Java 8 或反应式组件有一个准备好的方法将其映射或转换为 List ??
我应该使用.map()
final List<ServiceInstance> sis = convertedStringList.parallelStream()
.map( this.reactiveDiscoveryClient::getInstances )
// It should be converted to List<Object>
Run Code Online (Sandbox Code Playgroud)
在深入研究其他内容之前,请注意:将 a 转换Flux为List/Stream会使整个事情在概念的严格意义上不再是反应性的,因为您将离开推域并用拉域进行交易。根据用例,您可能需要也可能不需要(通常不需要)。只是想留下便条。
根据Flux 文档,该collectList方法将返回一个Mono<List<T>>. 它会立即返回,但它不是结果列表本身,而是一个惰性结构,Mono承诺当序列完成时结果最终会在那里。
根据Mono 文档,该block方法完成后将返回 Mono 的内容。请记住,block可能会返回 null。
将两者结合起来,您可以使用someFlux.collectList().block(). 假设someFlux是 a Flux<Object>,则结果将是 a List<Object>。
block如果 Flux 无限大,该方法将不会返回任何内容。例如,以下将返回包含两个单词的列表:
Flux.fromArray(new String[]{"foo", "bar"}).collectList().block()
Run Code Online (Sandbox Code Playgroud)
但以下内容将永远不会返回:
Flux.interval(Duration.ofMillis(1000)).collectList().block()
Run Code Online (Sandbox Code Playgroud)
为了防止无限期阻塞或阻塞时间过长,您可以将Duration参数传递给block,但是当订阅未按时完成时,这将超时并出现异常。
根据Flux 文档,该toStream方法将 a 转换Flux<T>为 a Stream<T>。这对于诸如flatMap. 为了演示起见,请注意这个简单的例子:
Stream.of("f")
.flatMap(letter ->
Flux.fromArray(new String[]{"foo", "bar"})
.filter(word -> word.startsWith(letter)).toStream())
.collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)
可以简单地使用.collectList().block().stream(),但不仅可读性较差,而且如果block返回 null 也可能导致 NPE。这种方法也无法完成无限通量,但因为这是一个未知大小的流,所以您仍然可以在它完成之前对其进行一些操作,而不会阻塞。
| 归档时间: |
|
| 查看次数: |
9273 次 |
| 最近记录: |