如何将 Flux<Object> 列表转换为 List<Object>

Ton*_*kuk 6 reactive-programming java-8 java-stream project-reactor reactive

我有一个 Flux,我想将其转换为 List。我怎样才能做到这一点?

Flux<Object> getInstances(String serviceId); // Current one 
List<Object> getInstances(String serviceId); // Demanded one
Run Code Online (Sandbox Code Playgroud)

Java 8 或反应式组件有一个准备好的方法将其映射或转换为 List ??

我应该使用.map()

final List<ServiceInstance> sis = convertedStringList.parallelStream()
            .map( this.reactiveDiscoveryClient::getInstances )
            // It should be converted to List<Object>
Run Code Online (Sandbox Code Playgroud)

rph*_*rph 9

1.确定你想要这个

在深入研究其他内容之前,请注意:将 a 转换FluxList/Stream会使整个事情在概念的严格意义上不再是反应性的,因为您将离开推域并用拉域进行交易。根据用例,您可能需要也可能不需要(通常不需要)。只是想留下便条。

2. 将 Flux 转换为列表

根据Flux 文档,该collectList方法将返回一个Mono<List<T>>. 它会立即返回,但它不是结果列表本身,而是一个惰性结构,Mono承诺当序列完成时结果最终会在那里。

根据Mono 文档,该block方法完成后将返回 Mono 的内容。请记住,block可能会返回 null。

将两者结合起来,您可以使用someFlux.collectList().block(). 假设someFlux是 a Flux<Object>,则结果将是 a List<Object>

block如果 Flux 无限大,该方法将不会返回任何内容。例如,以下将返回包含两个单词的列表:

Flux.fromArray(new String[]{"foo", "bar"}).collectList().block()
Run Code Online (Sandbox Code Playgroud)

但以下内容将永远不会返回:

Flux.interval(Duration.ofMillis(1000)).collectList().block()
Run Code Online (Sandbox Code Playgroud)

为了防止无限期阻塞或阻塞时间过长,您可以将Duration参数传递给block,但是当订阅未按时完成时,这将超时并出现异常。

3. 将 Flux 转换为 Stream

根据Flux 文档,该toStream方法将 a 转换Flux<T>为 a Stream<T>。这对于诸如flatMap. 为了演示起见,请注意这个简单的例子:

Stream.of("f")
    .flatMap(letter ->
                Flux.fromArray(new String[]{"foo", "bar"})
                        .filter(word -> word.startsWith(letter)).toStream())
    .collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)

可以简单地使用.collectList().block().stream(),但不仅可读性较差,而且如果block返回 null 也可能导致 NPE。这种方法也无法完成无限通量,但因为这是一个未知大小的流,所以您仍然可以在它完成之前对其进行一些操作,而不会阻塞。