bal*_*teo 2 reactive-programming project-reactor
我正在尝试使用项目反应 mergeWith器运算符来实现if/elseif/else此处描述的分支逻辑:RxJS,其中是 If-Else Operator。
提供的示例是用 RxJS 编写的,但基本思想保持不变。
filter基本上这个想法是在 3 上使用运算符(因此具有 3 个不同的谓词)并按如下方式monos/publishers合并 3 (这里当然是 RxJS):monosObservables
const somethings$ = source$
.filter(isSomething)
.do(something);
const betterThings$ = source$
.filter(isBetterThings)
.do(betterThings);
const defaultThings$ = source$
.filter((val) => !isSomething(val) && !isBetterThings(val))
.do(defaultThing);
// merge them together
const onlyTheRightThings$ = somethings$
.merge(
betterThings$,
defaultThings$,
)
.do(correctThings);
Run Code Online (Sandbox Code Playgroud)
我复制并粘贴了上述文章中的相关示例。
考虑something$,betterThings$和defaultThings$是我们的单元isSomething&isBetterThings是谓词。
现在这是我的 3 个真实的monos/publishers(用 java 编写的):
private Mono<ServerResponse> validateUser(User user) {
return Mono.just(new BeanPropertyBindingResult(user, User.class.getName()))
.doOnNext(err -> userValidator.validate(user, err))
.filter(AbstractBindingResult::hasErrors)
.flatMap(err ->
status(BAD_REQUEST)
.contentType(APPLICATION_JSON)
.body(BodyInserters.fromObject(err.getAllErrors()))
);
}
private Mono<ServerResponse> validateEmailNotExists(User user) {
return userRepository.findByEmail(user.getEmail())
.flatMap(existingUser ->
status(BAD_REQUEST)
.contentType(APPLICATION_JSON)
.body(BodyInserters.fromObject("User already exists."))
);
}
private Mono<ServerResponse> saveUser(User user) {
return userRepository.save(user)
.flatMap(newUser -> status(CREATED)
.contentType(APPLICATION_JSON)
.body(BodyInserters.fromObject(newUser))
);
}
Run Code Online (Sandbox Code Playgroud)
这是需要合并三个的顶级方法publishers:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
return serverRequest.bodyToMono(User.class)
.mergeWith(...)
}
Run Code Online (Sandbox Code Playgroud)
我不知道如何使用该mergeWith()运算符...我尝试过Mono.when()静态运算符,它需要多个发布者(对我有利)但返回一个Mono<void>(对我不利)。
有人可以帮忙吗?
PS 我相信你会原谅 RxJS (js) 和 Reactor 代码 (java) 之间的混合。我打算利用 RxJS 中的知识在我的 Reactor 应用程序中实现类似的目标。:-)
编辑1:我已经尝试过这个:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
return serverRequest
.bodyToMono(User.class)
.flatMap(user -> validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user))).single();
}
Run Code Online (Sandbox Code Playgroud)
但我收到这个错误:NoSuchElementException: Source was empty
编辑2:与(注意括号)相同:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
return serverRequest
.bodyToMono(User.class)
.flatMap(user -> validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user)).single());
}
Run Code Online (Sandbox Code Playgroud)
编辑 3:与 a 相同的错误Mono<User>:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
Mono<User> userMono = serverRequest.bodyToMono(User.class);
return validateUser(userMono)
.or(validateEmailNotExists(userMono))
.or(saveUser(userMono))
.single();
}
Run Code Online (Sandbox Code Playgroud)
编辑 4:我可以确认三个单声道中至少有一个将始终发射。当我使用or()操作符时,就会出现问题......
如果我使用它,我的所有测试都会通过:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
return serverRequest.bodyToMono(User.class)
.flatMap(user -> Flux.concat(validateUser(user), validateEmailNotExists(user), saveUser(user)).next().single());
}
Run Code Online (Sandbox Code Playgroud)
我在这里使用了concat()运算符来保留运算顺序。
你知道我对运营商有什么误解吗or()?
编辑5:我已与cache()操作员尝试如下,但无济于事:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
return serverRequest
.bodyToMono(User.class)
.cache()
.flatMap(user -> validateUser(user)
.or(validateEmailNotExists(user))
.or(saveUser(user))
.single()
);
}
Run Code Online (Sandbox Code Playgroud)
您当前的代码示例意味着您返回的 3 个方法Mono<ServerResponse>应该采用 aMono<User>而不是 a User,因此您可能需要在那里进行一些更改。
然而,我离题了——这似乎不是这里的主要问题。
根据我对该链接中描述的模式的理解,您正在创建 3 个独立的Mono对象,其中只有一个会返回结果 - 并且您需要Mono原始 3 个Mono对象中返回的一个。
在这种情况下,我会推荐如下内容:
Mono<ServerResult> result = Flux.merge(validateUser(user), validateEmailNotExists(user), saveUser(user)).next().single();
Run Code Online (Sandbox Code Playgroud)
分解一下:
Flux.merge()方法获取 3 个Mono对象并将它们合并到一个Flux;中。next()Mono以;形式返回第一个可用结果single()将确保Mono发出一个值,而不是什么都不发出,否则抛出异常。(可选,但只是一点安全网。)你也可以像这样链接Mono.or():
Mono<ServerResult> result = validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user)).single();
Run Code Online (Sandbox Code Playgroud)
这种方法的优点是:
Mono返回结果,这允许您设置选择一个结果的优先顺序(与上面的示例相反,在上面的示例中您将只获得第一个Mono发出值的值) .)缺点之一是性能方面。如果saveUser()在上面的代码中首先返回一个值,那么您仍然需要等待其他两个Mono对象完成才能Mono完成合并。