弹簧反应器中的错误处理未按预期工作

Rak*_*Pai 1 java spring project-reactor

我无法弄清楚为什么即使 tryDemo 方法抛出 NPE 也没有执行 doOnError 代码。我正在尝试了解反应式编程中的错误处理,我对此很陌生

Mono.zip( Mono.fromCallable( ()->tryDemo()), Mono.fromCallable( ()-   >tryDemo1()),Mono.fromCallable(()-> tryDemo2() ))
        .flatMap( data -> Mono.just( Tuples.of( data.getT1(), data.getT2(),data.getT3() ) ) )
        .doOnError( e -> log.error( "Error {}", e.getStackTrace() ) )
        .subscribe(T->{log.info("Tuple {}",T.getT2()  );});



public Mono<String> tryDemo() {
    log.info( "Data--1" );

    return Mono.error( NullPointerException::new );
    //return Mono.just( "1" );
}

15:56:07.023 [main] INFO com.infosys.rtbm.Test - Data--1
15:56:07.027 [main] INFO com.infosys.rtbm.Test - Tuple MonoJust
Run Code Online (Sandbox Code Playgroud)

Ole*_*uka 5

不要忘记订阅我

在您的示例中,您返回Mono错误。这里棘手的部分是fromCallable期望您返回一个标量值。

如果我们查看 的 API Mono.fromCallable,我们会发现接受的参数是

public static <T> Mono<T> fromCallable(Callable<? extends T> supplier)
Run Code Online (Sandbox Code Playgroud)

这意味着如果我们Callable返回Mono,我们将得到

Mono<Mono<Object>> monoOfMono = Mono.fromCallable(() -> 
     Mono.error(NullPointerException::new)
);
Run Code Online (Sandbox Code Playgroud)

因此,如果我们需要产生错误,我们必须直接在 lambda 中抛出该异常

Mono<Object> justMono = Mono.fromCallable(() -> {
     throws new NullPointerException()
});
Run Code Online (Sandbox Code Playgroud)

对于 Sumup,Mono.fromCallable不尝试检查返回类型是否为流。因此,您Mono被视为正常标量值并向下游传播。因此,要解决此问题,您可以执行以下操作:

在您的tryDemo方法中引发异常:

Mono.zip( Mono.fromCallable( ()->tryDemo()), Mono.fromCallable( ()-   >tryDemo1()),Mono.fromCallable(()-> tryDemo2() ))
    .flatMap( data -> Mono.just( Tuples.of( data.getT1(), data.getT2(),data.getT3() ) ) )
    .doOnError( e -> log.error( "Error {}", e.getStackTrace() ) )
    .subscribe(T->{log.info("Tuple {}",T.getT2()  );});



public String tryDemo() {
    log.info( "Data--1" );

    throw new NullPointerException();
    //return "1";
}
Run Code Online (Sandbox Code Playgroud)

替换fromCallabledefer

您可能会实现懒惰,这是您想要实现的fromCallable,使用Mono.defer运算符在这种情况下期望Mono作为 lambda 的返回类型。

如果我们查看该运算符的 API,我们将观察到以下内容

public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier);
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我们有Supplierwhich 期望Mono作为返回类型的东西,所以一旦你再次尝试你的初始代码,你将实现预期的行为:

Mono<Object> justMono = Mono.defer(() -> 
     Mono.error(NullPointerException::new)
);
Run Code Online (Sandbox Code Playgroud)

在这种情况下,一旦供应商返回 a MonoMono.defere订阅它并收到错误信号:

Mono.zip( 
        Mono.defer(() -> tryDemo()), 
        Mono.defer(() -> tryDemo1()),
        Mono.defer(() -> tryDemo2())
    )
    .flatMap(data -> Mono.just(Tuples.of( 
         data.getT1(), 
         data.getT2(), 
         data.getT3()
    )))
    .doOnError( e -> log.error( "Error {}", e.getStackTrace() ) )
    .subscribe(T -> {
        log.info("Tuple {}",T.getT2()  );
    });
Run Code Online (Sandbox Code Playgroud)