如何使用反应式编程实现嵌套的异步代码?

Pro*_*tle 1 java asynchronous reactive-programming project-reactor

我对反应式编程非常陌生。尽管我对函数式编程和Kotlin协程非常熟悉,但是我仍然无法弄清楚如何使用反应式编程范例来重构普通的嵌套CRUD代码,尤其是那些具有嵌套异步操作的代码。

例如,以下是基于Java 8的简单异步CRUD代码段 CompletableFuture


        getFooAsync(id)
                .thenAccept(foo -> {
                    if (foo == null) {
                        insertFooAsync(id, new Foo());
                    } else {
                        getBarAsync(foo.bar)
                                .thenAccept(bar -> {
                                   updateBarAsync(foo, bar);
                                });
                    }
                });

Run Code Online (Sandbox Code Playgroud)

使用Kotlin协程进行重构非常容易,这使得它在不失去异步性的情况下更具可读性。

 val foo = suspendGetFoo(id)
 if(foo==null) {
   suspendInsertFoo(id, Foo())
 } else {
   val bar = suspendGetBar(foo.bar)
   suspendUpdateBar(foo, bar);-
}
Run Code Online (Sandbox Code Playgroud)

但是,这样的代码是否适合于反应式编程?

如果是这样,给定一个Flux<String> idFlux,如何使用Reactor 3对其进行重构?

它是一个好主意,只需更换所有的CompletableFutureMono

Phi*_*lay 8

像这样的代码适合于反应式编程吗?

恕我直言,Kotlin协程更适合此用例,并导致更简洁的代码。

但是,您可以在反应式流中执行此操作。

用Mono替换每个CompletableFuture是一个好主意吗?

我发现反应式流非常好地处理了许多异步用例(例如Project Reactor示例)。但是,肯定有一些用例不太合适。因此,我不推荐使用被动流替换每个 策略的策略CompletableFuture

但是,必须背离的一种情况CompletableFuture是需要背压时。

关于使用哪种异步模式的很多决定取决于您使用的语言/框架/工具/库以及您和您的团队成员对它们的适应程度。如果您使用的库具有Kotlin的良好支持,并且您的团队熟悉Kotlin,请使用协程。对于反应性流同样如此。

给定一个Flux<String> idFlux,如何使用Reactor 3进行重构?

在考虑此用例的反应流时,请牢记以下几点:

  1. 反应性流无法发射null。相反,Mono通常使用空值。(从技术上讲Mono<Optional<...>>,您也可以使用,但那时候您只是在伤脑筋,乞求错误)
  2. 当一个Mono是空的,lambda表达式传递到任何操作者与所述交易onNext信号(例如.map.flatMap.handle,等)是不被调用。请记住,您正在处理数据流(而不是命令性控制流)
  3. .switchIfEmpty.defaultIfEmpty运营商可以在空操作Mono秒。但是,它们不提供else条件。下游操作员不知道该流以前是空的(除非从Publisher传递到的元素.switchIfEmpty很容易以某种方式识别)
  4. 如果您有多个运算符的流,并且多个运算符可能导致该流变空,那么下游操作员很难/不可能确定该流为何变空。
  5. 主异步操作符,其允许从上游运营商处理发射值.flatMap.flatMapSequential.concatMap。您将需要使用它们来链接对先前异步操作的输出进行操作的异步操作。
  6. 由于您的用例未返回值,因此反应流实现将返回Mono<Void>

综上所述,下面是将您的示例转换为3号反应堆的尝试(有一些警告):

    Mono<Void> updateFoos(Flux<String> idFlux) {
        return idFlux                                         // Flux<String>
            .flatMap(id -> getFoo(id)                         // Mono<Foo>
                /*
                 * If a Foo with the given id is not found,
                 * create a new one, and continue the stream with it.
                 */
                .switchIfEmpty(insertFoo(id, new Foo()))      // Mono<Foo>
                /*
                 * Note that this is not an "else" condition
                 * to the above .switchIfEmpty
                 *
                 * The lambda passed to .flatMap will be
                 * executed with either:
                 * A) The foo found from getFoo
                 *    OR
                 * B) the newly inserted Foo from insertFoo
                 */
                .flatMap(foo -> getBar(foo.bar)               // Mono<Bar>
                    .flatMap(bar -> updateBar(foo, bar))      // Mono<Bar>
                    .then()                                   // Mono<Void>
                )                                             // Mono<Void>
            )                                                 // Flux<Void>
            .then();                                          // Mono<Void>
    }

    /*
     * @return the Foo with the given id, or empty if not found
     */
    abstract Mono<Foo> getFoo(String id);

    /*
     * @return the Bar with the given id, or empty if not found
     */
    abstract Mono<Bar> getBar(String id);

    /*
     * @return the Foo inserted, never empty
     */
    abstract Mono<Foo> insertFoo(String id, Foo foo);

    /*
     * @return the Bar updated, never empty
     */
    abstract Mono<Bar> updateBar(Foo foo, Bar bar);
Run Code Online (Sandbox Code Playgroud)

这是一个更复杂的示例,该示例使用Tuple2<Foo,Boolean>来指示是否找到了原始的Foo(这在语义上应与您的示例等效):

    Mono<Void> updateFoos(Flux<String> idFlux) {
        return idFlux                                         // Flux<String>
            .flatMap(id -> getFoo(id)                         // Mono<Foo>
                /*
                 * Map to a Tuple2 whose t2 indicates whether the foo was found.
                 * In this case, it was found.
                 */
                .map(foo -> Tuples.of(foo, true))             // Mono<Tuple2<Foo,Boolean>>
                /*
                 * If a Foo with the given id is not found,
                 * create a new one, and continue the stream with 
                 * a Tuple2 indicating it wasn't originally found
                 */
                .switchIfEmpty(insertFoo(id, new Foo())       // Mono<Foo>
                    /*
                     * Foo was not originally found, so t2=false
                     */
                    .map(foo -> Tuples.of(foo, false)))       // Mono<Tuple2<Foo,Boolean>>
                /*
                 * The lambda passed to .flatMap will be
                 * executed with either:
                 * A) t1=foo found from getFoo, t2=true
                 *    OR
                 * B) t1=newly inserted Foo from insertFoo, t2=false
                 */
                .flatMap(tuple2 -> tuple2.getT2()
                    // foo originally found 
                    ? getBar(tuple2.getT1().bar)              // Mono<Bar>
                        .flatMap(bar -> updateBar(tuple2.getT1(), bar)) // Mono<Bar>
                        .then()                               // Mono<Void>
                    // foo originally not found (new inserted)
                    : Mono.empty()                            // Mono<Void>
                )
            )                                                 // Flux<Void>
            .then();                                          // Mono<Void>
    }

Run Code Online (Sandbox Code Playgroud)