将多个 Mono<List<Item>> 合并为一个

cyp*_*man 3 java reactive-programming project-reactor

我目前正在开发一个涉及一些反应式编程的项目。

我有 4 个不同的反应性存储库,分别从中得到 4 个不同的Mono<List<SomeType>>回报。目标是将它们组合成一个Mono<List<GeneralType>>,以便将其合并到自定义响应中以在ResponseEntity.ok(). 我已经创建了一个GeneralType并成功转换了一个Mono<List<SomeType>>,但是没有取得进一步的进展。

所有存储库都有相似的签名:

public Mono<List<SomeType>> findAllByUserId(UUID userId)
Run Code Online (Sandbox Code Playgroud)

我的回复中的字段将所有不同的列表合并为一个:

private Mono<List<GeneralType>> items;
Run Code Online (Sandbox Code Playgroud)

到目前为止我的方法是什么样的:

public Mono<List<GeneralType>> combineMonos(UUID userId) {
    Mono<List<GeneralType>> combo1 = reactiveRepository.findAllByUserId(userId)
        .map(list -> list.stream()
            .map(GeneralType::new)
            .collect(Collectors.toList()));
    return combo1; // works just fine
}
Run Code Online (Sandbox Code Playgroud)

所有其他列表都有几乎相同的方法,但是将它们组合成一个 Mono<List> 是一个问题。

我尝试过以下方法:

return Flux.merge(combo1.flatMapMany(Flux::fromIterable), combo2.flatMapMany(Flux::fromIterable)).collectList();
Run Code Online (Sandbox Code Playgroud)

但这样一来,IDE 就会敦促将返回类型更改为Flux<Object>. 另外,有些列表可能为空,所以我不确定zip()这里是否有一个选项。我读过,如果至少有一个结果为空,它将返回所有内容为空。

所以问题是如何在不到处都是block()的情况下以有效的方式完成这一点?

vin*_*ins 5

Merge急切地连接到所有数据源。因此,当数据从任何源发出时,它将被传递到下游管道。结果列表中的顺序基于该项目的发出时间。

\n

Zip方法从源收集数据并将它们放入一个对象(Tuple \xe2\x80\x93 就像一个盒子)中并传递到下游。只要所有源都发出数据,Zip 就可以工作。任何源完成/抛出错误,它都会停止。

\n
\n

我认为您的个人方法运行良好。您的问题与将结果合并到单个列表中有关。

\n
private Mono<List<String>> getList1(){\n    return Mono.just(List.of("a", "b", "c"));\n}\n\nprivate Mono<List<String>> getList2(){\n    return Mono.just(Collections.emptyList());\n}\n\nprivate Mono<List<String>> getList3(){\n    return Mono.just(List.of("A", "B", "C"));\n}\n\n\n    Flux.merge(getList1(), getList2(), getList3())\n            .flatMapIterable(Function.identity())\n            .collectList()\n            .subscribe(System.out::println);  // [a, b, c, A, B, C]\n
Run Code Online (Sandbox Code Playgroud)\n
\n

参考:\n http://www.vinsguru.com/reactive-programming-reactor-combining-multiple-sources-of-flux-mono/

\n