Nis*_*ant 6 project-reactor reactive-streams spring-webflux
我开始使用 Project reactor,其中一个让我苦苦挣扎的地方是如何将 Mono 的东西与 Flux 结合起来。这是我的用例:
public interface GroupRepository {
Mono<GroupModel> getGroup(Long groupId);
}
public interface UserRepository {
Flux<User> getUsers(Set<Long> userIds);
}
Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.
Run Code Online (Sandbox Code Playgroud)
现在我想要实现的是:
如何组合来自 UserFlux 的响应并将这些用户与该组相关联,例如 group.addUsers(userfromFlux)。
有人可以帮助如何组合来自 userFlux 和 groupMono 的结果。我想我使用了 Zip 之类的东西,但它会从源代码进行一对一映射。就我而言,我需要进行 1 到 N 映射。在这里,我有一个组但需要添加到该组的多个用户。返回Mono<List<Users>然后将 zip 运算符与 mono 一起使用并提供此处提到的组合器是个好主意
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
final BiFunction<? super T1, ? super T2, ? extends O> combinator)吗?
Ali*_*zan 10
这个 1 到 N 的映射听起来类似于我在这里回答的一个问题:
您可以 Flux.zip 一个单声道和一个通量,并为每个通量值重复单声道值吗?
如果该链接断开,这里再次提供答案。我认为这种方法不会有很好的性能,因为每次都会重新计算单声道。为了获得更好的性能,如果您的 Mono 围绕缓慢的操作,那么拥有一些缓存层可能会更好。
假设你有一个助焊剂和一个像这样的单声道:
// a flux that contains 6 elements.
final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));
// a mono of 1 element.
final Mono<String> groupLabel = Mono.just("someGroupLabel");
Run Code Online (Sandbox Code Playgroud)
首先,我将向您展示尝试压缩我尝试过的 2 的错误方法,我认为其他人会尝试:
// wrong way - this will only emit 1 event
final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
.zipWith(groupLabel);
// you'll see that onNext() is only called once,
// emitting 1 item from the mono and first item from the flux.
wrongWayOfZippingFluxToMono
.log()
.subscribe();
Run Code Online (Sandbox Code Playgroud)
// this is how to zip up the flux and mono how you'd want,
// such that every time the flux emits, the mono emits.
final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
.flatMap(userId -> Mono.just(userId)
.zipWith(groupLabel));
// you'll see that onNext() is called 6 times here, as desired.
correctWayOfZippingFluxToMono
.log()
.subscribe();
Run Code Online (Sandbox Code Playgroud)
我认为Flux.combineLatest静态方法可以帮助你:因为你Mono只发出 1 个元素,所以该元素将始终是与Flux.
Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
groupMono, userFlux);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8216 次 |
| 最近记录: |