cyp*_*man 3 java reactive-programming project-reactor
我目前正在开发一个涉及一些反应式编程的项目。
我有 4 个不同的反应性存储库,分别从中得到 4 个不同的Mono<List<SomeType>>回报。目标是将它们组合成一个Mono<List<GeneralType>>,以便将其合并到自定义响应中以在ResponseEntity.ok(). 我已经创建了一个GeneralType并成功转换了一个Mono<List<SomeType>>,但是没有取得进一步的进展。
所有存储库都有相似的签名:
public Mono<List<SomeType>> findAllByUserId(UUID userId)
Run Code Online (Sandbox Code Playgroud)
我的回复中的字段将所有不同的列表合并为一个:
private Mono<List<GeneralType>> items;
Run Code Online (Sandbox Code Playgroud)
到目前为止我的方法是什么样的:
public Mono<List<GeneralType>> combineMonos(UUID userId) {
Mono<List<GeneralType>> combo1 = reactiveRepository.findAllByUserId(userId)
.map(list -> list.stream()
.map(GeneralType::new)
.collect(Collectors.toList()));
return combo1; // works just fine
}
Run Code Online (Sandbox Code Playgroud)
所有其他列表都有几乎相同的方法,但是将它们组合成一个 Mono<List> 是一个问题。
我尝试过以下方法:
return Flux.merge(combo1.flatMapMany(Flux::fromIterable), combo2.flatMapMany(Flux::fromIterable)).collectList();
Run Code Online (Sandbox Code Playgroud)
但这样一来,IDE 就会敦促将返回类型更改为Flux<Object>. 另外,有些列表可能为空,所以我不确定zip()这里是否有一个选项。我读过,如果至少有一个结果为空,它将返回所有内容为空。
所以问题是如何在不到处都是block()的情况下以有效的方式完成这一点?
Merge急切地连接到所有数据源。因此,当数据从任何源发出时,它将被传递到下游管道。结果列表中的顺序基于该项目的发出时间。
\nZip方法从源收集数据并将它们放入一个对象(Tuple \xe2\x80\x93 就像一个盒子)中并传递到下游。只要所有源都发出数据,Zip 就可以工作。任何源完成/抛出错误,它都会停止。
\n我认为您的个人方法运行良好。您的问题与将结果合并到单个列表中有关。
\nprivate Mono<List<String>> getList1(){\n return Mono.just(List.of("a", "b", "c"));\n}\n\nprivate Mono<List<String>> getList2(){\n return Mono.just(Collections.emptyList());\n}\n\nprivate Mono<List<String>> getList3(){\n return Mono.just(List.of("A", "B", "C"));\n}\n\n\n Flux.merge(getList1(), getList2(), getList3())\n .flatMapIterable(Function.identity())\n .collectList()\n .subscribe(System.out::println); // [a, b, c, A, B, C]\nRun Code Online (Sandbox Code Playgroud)\n参考:\n http://www.vinsguru.com/reactive-programming-reactor-combining-multiple-sources-of-flux-mono/
\n