Java Reactor - 条件流执行

Wic*_*cia 5 java reactive-programming project-reactor

我想知道如何使用 Reactor 创建“逻辑流”。

让我们假设我想实现以下场景:

作为输入,我有对象要保存在数据库中。作为输出,我想获得 Mono 表示执行消息。

  • 选项 1: 如果要保存的对象已填充所有字段,则我执行附加操作,将其保存到数据库并最终返回“ Success”消息

  • 选项 2:如果要保存的对象至少有一个字段未填写,我返回“ Error

我已经创建了这样的代码:

Mono<String> message = Mono.just(new User("Bob the Reactor master")) // user with name = can be saved
    .flatMap(user -> {
        if(user.getName() != null && user.getName().length() > 1){
            // Perform additional operations e.g. user.setCreatedDate(new Date())
            // Save to repository e.g. repository.save(user)
            return Mono.just("Success!");
        }
        else{
            return Mono.just("Error!");
        }
    })
    .doOnNext(System.out::println); // print stream result

message.subscribe();
Run Code Online (Sandbox Code Playgroud)

这段代码是 100% 反应性的吗(有它的所有好处)?如果没有,那么它会是什么样子?

Ale*_*kin 3

答案取决于您评论的存储库。

  • 存储库是非阻塞的并返回 Mono 或 Flux

    您应该订阅它然后返回 Success Mono。在你的 if 语句中:

    return repository.save(user).then(Mono.just("Success!"));
    
    Run Code Online (Sandbox Code Playgroud)
  • 存储库被阻止

    您应该使存储库调用非阻塞,将其执行移动到单独的线程。Reactor 方式是用 Mono 包装它并订阅弹性调度程序或您的自定义调度程序。

    Mono<String> message = Mono.just(new User("Bob the Reactor master"))
            .filter(user -> user.getName() != null && user.getName().length() > 1)
            .map(user -> user) // Perform additional operations e.g. user.setCreatedDate(new Date())
            .flatMap(user -> Mono.fromRunnable(() -> repository.save(user))
                    .subscribeOn(Schedulers.elastic())
                    .then(Mono.just("Success!")))
            .switchIfEmpty(Mono.just("Error!"));
    
    Run Code Online (Sandbox Code Playgroud)