and*_*onc 3 java reactive-streams spring-webflux
我创建了以下方法来查找 Analysis 对象,更新其结果字段,然后最后将结果保存在数据库中,但不等待返回。
public void updateAnalysisWithResults(String uuidString, String results) {
findByUUID(uuidString).subscribe(analysis -> {
analysis.setResults(results);
computeSCARepository.save(analysis).subscribe();
});
}
Run Code Online (Sandbox Code Playgroud)
在订阅中订阅感觉写得不好。这是一个不好的做法吗?有更好的方法来写这个吗?
更新:入口点
@PatchMapping("compute/{uuid}/results")
public Mono<Void> patchAnalysisWithResults(@PathVariable String uuid, @RequestBody String results) {
return computeSCAService.updateAnalysisWithResults(uuid,results);
}
Run Code Online (Sandbox Code Playgroud)
public Mono<Void> updateAnalysisWithResults(String uuidString, String results) {
// findByUUID(uuidString).subscribe(analysis -> {
// analysis.setResults(results);
// computeSCARepository.save(analysis).subscribe();
// });
return findByUUID(uuidString)
.doOnNext(analysis -> analysis.setResults(results))
.doOnNext(computeSCARepository::save)
.then();
}
Run Code Online (Sandbox Code Playgroud)
Tho*_*olf 11
为什么它不起作用是因为你误解了它的doOnNext作用。
让我们从头开始。
AFlux或Mono是生产者,他们生产物品。您的应用程序会向调用客户端生成内容,因此它应该始终返回 aMono或 a Flux。如果您不想返回任何内容,则应该返回Mono<Void>.
当您的应用程序的客户端时,subscribesreactor 会做的是朝相反方向调用所有运算符,直到找到producer. 这就是所谓的assembly phase。如果你所有的操作员不连锁在一起,你就是我所说的breaking the reactive chain。
当你断开链条时,从链条上断开的东西将不会被执行。
如果我们看一下你的例子,但是是一个更分解的版本:
@Test
void brokenChainTest() {
updateAnalysisWithResults("12345", "Foo").subscribe();
}
public Mono<Void> updateAnalysisWithResults(String uuidString, String results) {
return findByUUID(uuidString)
.doOnNext(analysis -> analysis.setValue(results))
.doOnNext(this::save)
.then();
}
private Mono<Data> save(Data data) {
return Mono.fromCallable(() -> {
System.out.println("Will not print");
return data;
});
}
private Mono<Data> findByUUID(String uuidString) {
return Mono.just(new Data());
}
private static class Data {
private String value;
public void setValue(String value) {
this.value = value;
}
}
Run Code Online (Sandbox Code Playgroud)
上面的示例中save是callable一个将返回producer. 但是如果我们运行上面的函数,您会发现它print永远不会被执行。
这与 的使用有关doOnNext。如果我们阅读它的文档,它会说:
添加 Mono 成功发出数据时触发的行为。首先执行 Consumer,然后将 onNext 信号传播到下游。
doOnNext接受Consumer返回 void 的 a。如果我们看一下,doOnNext我们会发现函数描述如下:
public final Mono<T> doOnNext(Consumer<? super T> onNext)`
Run Code Online (Sandbox Code Playgroud)
这意味着它接受一个消费者 aT或扩展 aT并返回 a Mono<T>。因此,为了长话短说,您可以看到它消耗了一些东西,但也返回了相同的东西。
这意味着这通常用于所谓的基本上是在不阻碍电流的一侧side effects完成的事情。其中之一可以例如记录。日志记录是需要消耗字符串并记录它的事情之一,而我们希望保持字符串在我们的程序中流动。或者也许我们想在侧面增加一个数字。或者修改某处的某些状态。您可以在这里阅读有关副作用的所有信息。
你可以这样直观地思考它:
_____ side effect (for instance logging)
/
___/______ main reactive flow
Run Code Online (Sandbox Code Playgroud)
这就是为什么你的第一个doOnNext设置器起作用,因为你正在修改侧面的状态,你正在你的类上设置值,从而修改你的类的状态以具有值。
另一方面,第二条语句“保存”不会被执行。您会看到该函数实际上返回了我们需要处理的内容。
它看起来是这样的:
save
_____
/ \ < Broken return
___/ ____ no main reactive flow
Run Code Online (Sandbox Code Playgroud)
我们所要做的实际上只是更改一行:
// From
.doOnNext(this::save)
// To
.flatMap(this::save)
Run Code Online (Sandbox Code Playgroud)
flatMap获取 中的任何内容Mono,然后我们可以使用它来执行某些内容,然后返回一个“新”内容。
所以我们的流程(使用 flatMap)现在看起来像这样:
setValue() save()
______ _____
/ / \
__/____________/ \______ return to client
Run Code Online (Sandbox Code Playgroud)
因此,通过使用,flatMap我们现在可以保存并返回从该函数返回的任何内容,从而触发链的其余部分。
如果您选择忽略从返回的任何内容,那么flatMap它的做法完全正确,就像您调用then它所做的那样
返回一个 Mono,它只重放来自此的完整信号和错误信号
一般规则是,在完全响应式应用程序中,您永远不应该阻塞。
subscribe除非您的申请是最终的,否则您通常不会这样做consumer。这意味着如果您的应用程序启动了请求,那么您就是consumer其他东西的,所以您subscribe. 如果网页从请求开始,那么他们就是最终的consumer,他们正在订阅。
如果您在生成数据的应用程序中订阅,就像您在经营一家面包店并同时吃烤面包一样。
不要这样做,这对生意不利:D