在Spring Data上订阅两次MongoDB save()会导致双重插入

And*_*ing 0 spring spring-mvc spring-data-mongodb project-reactor reactive

我们遇到了以下我们理解的行为,但是我们想知道它是否是预期的,以及将它记录为某种陷阱可能是有意义的.

我们正在试验Spring Boot 2/Spring WebFlux并设置一个基本上有这样的东西的小应用程序(全部缩短):

@PostMapping
public Mono<Todo> addTodos( @RequestBody Person person ) {
    return personService.addPerson( person );
}
Run Code Online (Sandbox Code Playgroud)

该服务首先看起来像这样,因为我们想要将一个人的事件也发布到消息队列:

public class PersonService {
    public Mono<Person> addPerson( Person person ) {
        Mono<Person> addedPerson = personRepository.save( person );
        addedPerson.subscribe( p -> rabbitTemplate.convertAndSend( "persons", p ) );
        return addedPerson;
    }
}
Run Code Online (Sandbox Code Playgroud)

所以,这样做显然是错误的.该.subscribe()触发器的流动和我们假设反应REST控制器确实在背景相同的序列化的数据的响应,从而导致第二平行流动之前.最后,我们最终persons在数据库中的集合中有两个重复的条目.

在这个冗长的介绍之后,最后的问题是:这是多个订阅者触发多个插入的预期行为(基本上,如果您订阅n时间,您会获得n插入)?

如果是的话,这对于初学者来说可能是一个陷阱,特别是如果我们的理解是正确的,那么反应式REST控制器就会发挥作用.subscribe().

mp9*_*1de 5

你自己得出了描述预期行为的结论.

反应式编程模型与命令式编程模型在各个领域不同.

命令式编程结合了转换,映射,执行和其他方面.您可以通过创建条件/循环流,方法调用来表达这些,这些方法调用可以返回值并将值传递给API调用.

反应式编程解耦的声明什么的情况发生怎么它会被执行.使用反应式基础设施的执行分为两部分:反应序列组成和实际执行.在您的代码中,您只编写反应序列.执行发生在您的代码之外.

当你编写一个Publisher,然后结果Publisher包含一个事件的声明,如果执行将发生.A Publisher并不意味着它是否会首先执行,也不会最终订阅多少订阅者.

从上面的示例中,Mono<Person> PersonRepository.save(…)返回一个发布者:

  1. 将数据映射PersonDocument
  2. 保存Document到MongoDB和
  3. Person一旦MongoDB的响应回来,就会保存

这是使用特定存储库方法保存数据的方法.创建发布者不会执行发布者,并且发布者不会就执行次数发表意见.多次调用以.subscribe()多次执行发布者.

我认为.subscribe()这不是一个陷阱.反应式编程模型方法会使执行失败.如果你打电话.subscribe().block(),那么你应该有充分的理由这样做.每当您看到.subscribe().block()在您的代码中,您应该特别注意这是否正确.您的执行环境负责订阅Publishers.


一些观察:

  • RabbitTemplate是一个阻止API.您不应混合使用反应和阻止API.如果您没有其他选项,则卸载阻止对工作人员的调用.二者必选其一publishOn(…)沿着Scheduler包含阻塞工作或使用的实际操作前ExecutorService/ CompletableFuture一起flatMap(…).
  • 使用flatMap(…)操作符作为Mono/的反应流组成Flux.该flatMap(…)运营商开始无阻塞的子过程,最终和完整继续流动.
  • 当发布者发出特定信号时,使用doOnXXX(…)运算符(doOnNext(…),, doOnSuccess(…)...)进行回调.这些钩子方法允许方便地拦截元素非阻塞消耗.

参考文献: