Spring Webflux 查找和保存的正确方法

and*_*onc 3 java reactive-streams spring-webflux

我创建了以下方法来查找 Analysis 对象,更新其结果字段,然后最后将结果保存在数据库中,但不等待返回。

public void updateAnalysisWithResults(String uuidString, String results) {
        findByUUID(uuidString).subscribe(analysis -> {
            analysis.setResults(results);
            computeSCARepository.save(analysis).subscribe();
        });
    }
Run Code Online (Sandbox Code Playgroud)

在订阅中订阅感觉写得不好。这是一个不好的做法吗?有更好的方法来写这个吗?

更新:入口点

@PatchMapping("compute/{uuid}/results")
    public Mono<Void> patchAnalysisWithResults(@PathVariable String uuid, @RequestBody String results) {
        return computeSCAService.updateAnalysisWithResults(uuid,results);
    }
Run Code Online (Sandbox Code Playgroud)
    public Mono<Void> updateAnalysisWithResults(String uuidString, String results) {
//        findByUUID(uuidString).subscribe(analysis -> {
//            analysis.setResults(results);
//            computeSCARepository.save(analysis).subscribe();
//        });
        return findByUUID(uuidString)
                .doOnNext(analysis -> analysis.setResults(results))
                .doOnNext(computeSCARepository::save)
                .then();
    }
Run Code Online (Sandbox Code Playgroud)

Tho*_*olf 11

为什么它不起作用是因为你误解了它的doOnNext作用。

让我们从头开始。

AFluxMono是生产者,他们生产物品。您的应用程序会向调用客户端生成内容,因此它应该始终返回 aMono或 a Flux。如果您不想返回任何内容,则应该返回Mono<Void>.

当您的应用程序的客户端时,subscribesreactor 会做的是朝相反方向调用所有运算符,直到找到producer. 这就是所谓的assembly phase。如果你所有的操作员不连锁在一起,你就是我所说的breaking the reactive chain

当你断开链条时,从链条上断开的东西将不会被执行。

如果我们看一下你的例子,但是是一个更分解的版本:

@Test
void brokenChainTest() {
    updateAnalysisWithResults("12345", "Foo").subscribe();
}

public Mono<Void> updateAnalysisWithResults(String uuidString, String results) {
    return findByUUID(uuidString)
            .doOnNext(analysis -> analysis.setValue(results))
            .doOnNext(this::save)
            .then();
}

private Mono<Data> save(Data data) {
    return Mono.fromCallable(() -> {
        System.out.println("Will not print");
        return data;
    });
}

private Mono<Data> findByUUID(String uuidString) {
    return Mono.just(new Data());
}

private static class Data {
    private String value;

    public void setValue(String value) {
        this.value = value;
    }
}
Run Code Online (Sandbox Code Playgroud)

上面的示例中savecallable一个将返回producer. 但是如果我们运行上面的函数,您会发现它print永远不会被执行。

这与 的使用有关doOnNext。如果我们阅读它的文档,它会说:

添加 Mono 成功发出数据时触发的行为。首先执行 Consumer,然后将 onNext 信号传播到下游。

doOnNext接受Consumer返回 void 的 a。如果我们看一下,doOnNext我们会发现函数描述如下:

public final Mono<T> doOnNext(Consumer<? super T> onNext)`
Run Code Online (Sandbox Code Playgroud)

这意味着它接受一个消费者 aT或扩展 aT并返回 a Mono<T>。因此,为了长话短说,您可以看到它消耗了一些东西,但也返回了相同的东西。

这意味着这通常用于所谓的基本上是在不阻碍电流的一侧side effects完成的事情。其中之一可以例如记录。日志记录是需要消耗字符串并记录它的事情之一,而我们希望保持字符串在我们的程序中流动。或者也许我们想在侧面增加一个数字。或者修改某处的某些状态。您可以在这里阅读有关副作用的所有信息。

你可以这样直观地思考它:

     _____ side effect (for instance logging)
    /
___/______ main reactive flow
Run Code Online (Sandbox Code Playgroud)

这就是为什么你的第一个doOnNext设置器起作用,因为你正在修改侧面的状态,你正在你的类上设置值,从而修改你的类的状态以具有值。

另一方面,第二条语句“保存”不会被执行。您会看到该函数实际上返回了我们需要处理的内容。

它看起来是这样的:

      save
     _____ 
    /     \   < Broken return
___/             ____ no main reactive flow
Run Code Online (Sandbox Code Playgroud)

我们所要做的实际上只是更改一行:

// From
.doOnNext(this::save)

// To
.flatMap(this::save)
Run Code Online (Sandbox Code Playgroud)

flatMap获取 中的任何内容Mono,然后我们可以使用它来执行某些内容,然后返回一个“新”内容。

所以我们的流程(使用 flatMap)现在看起来像这样:

   setValue()    save()
    ______       _____ 
   /            /     \   
__/____________/       \______ return to client
Run Code Online (Sandbox Code Playgroud)

因此,通过使用,flatMap我们现在可以保存并返回从该函数返回的任何内容,从而触发链的其余部分。

如果您选择忽略从返回的任何内容,那么flatMap它的做法完全正确,就像您调用then它所做的那样

返回一个 Mono,它只重放来自此的完整信号和错误信号

一般规则是,在完全响应式应用程序中,您永远不应该阻塞

subscribe除非您的申请是最终的,否则您通常不会这样做consumer。这意味着如果您的应用程序启动了请求,那么您就是consumer其他东西的,所以您subscribe. 如果网页从请求开始,那么他们就是最终的consumer,他们正在订阅。

如果您在生成数据的应用程序中订阅,就像您在经营一家面包店并同时吃烤面包一样。

不要这样做,这对生意不利:D