将 Flux 的结果与 Mono 的结果相结合

Nis*_*ant 6 project-reactor reactive-streams spring-webflux

我开始使用 Project reactor,其中一个让我苦苦挣扎的地方是如何将 Mono 的东西与 Flux 结合起来。这是我的用例:

public interface GroupRepository {
       Mono<GroupModel> getGroup(Long groupId);
}

public interface UserRepository {
       Flux<User> getUsers(Set<Long> userIds);   
}

Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.
Run Code Online (Sandbox Code Playgroud)

现在我想要实现的是:

如何组合来自 UserFlux 的响应并将这些用户与该组相关联,例如 group.addUsers(userfromFlux)。

有人可以帮助如何组合来自 userFlux 和 groupMono 的结果。我想我使用了 Zip 之类的东西,但它会从源代码进行一对一映射。就我而言,我需要进行 1 到 N 映射。在这里,我有一个组但需要添加到该组的多个用户。返回Mono<List<Users>然后将 zip 运算符与 mono 一起使用并提供此处提到的组合器是个好主意
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)吗?

Ali*_*zan 10

这个 1 到 N 的映射听起来类似于我在这里回答的一个问题:

您可以 Flux.zip 一个单声道和一个通量,并为每个通量值重复单声道值吗?

如果该链接断开,这里再次提供答案。我认为这种方法不会有很好的性能,因为每次都会重新计算单声道。为了获得更好的性能,如果您的 Mono 围绕缓慢的操作,那么拥有一些缓存层可能会更好。

假设你有一个助焊剂和一个像这样的单声道:

 // a flux that contains 6 elements.
 final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));

 // a mono of 1 element.
 final Mono<String> groupLabel = Mono.just("someGroupLabel");
Run Code Online (Sandbox Code Playgroud)

首先,我将向您展示尝试压缩我尝试过的 2 的错误方法,我认为其他人会尝试:

 // wrong way - this will only emit 1 event 
 final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
         .zipWith(groupLabel);

 // you'll see that onNext() is only called once, 
 //     emitting 1 item from the mono and first item from the flux.
 wrongWayOfZippingFluxToMono
         .log()
         .subscribe();
Run Code Online (Sandbox Code Playgroud)

错误的方法

 // this is how to zip up the flux and mono how you'd want, 
 //     such that every time the flux emits, the mono emits. 
 final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
         .flatMap(userId -> Mono.just(userId)
                 .zipWith(groupLabel));

 // you'll see that onNext() is called 6 times here, as desired. 
 correctWayOfZippingFluxToMono
         .log()
         .subscribe();
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述


Sim*_*slé 5

我认为Flux.combineLatest静态方法可以帮助你:因为你Mono只发出 1 个元素,所以该元素将始终是与Flux.

Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
                   groupMono, userFlux);
Run Code Online (Sandbox Code Playgroud)

  • 也许我没有得到什么,但是如果我将 `combineLatest` 与 `Flux` 和 `Mono` 一起使用,那么我不可能“删除”“Flux”中的第一个事件(如果“Mono”稍后会推出吗?例如:用户存储库速度非常快,在组存储库返回一个组之前已经返回了 10 个用户,那么我期望 `combineLatest` 后面的流中的第一个元素是第 10 个用户与该组的组合,因为`combineLatest` 实际上组合了*最新*元素。有人可以帮我解决这个问题吗? (3认同)