Spring Reactor onErrorContinue 不起作用

Nir*_*ane 3 flux project-reactor spring-webflux

根据文档,我期望onErrorContinue将忽略错误元素并继续序列。以下测试用例因异常而失败

java.lang.AssertionError: expectation "expectNext(12)" failed (expected: onNext(12); actual: onError(java.lang.RuntimeException:
Run Code Online (Sandbox Code Playgroud)
    @Test
            public void testOnErrorContinue() throws InterruptedException {

                Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
                        .concatWith(Flux.error(new RuntimeException("Test")))
                        .concatWith(Flux.just(6))
                        .map(i->i*2)
                        .onErrorContinue((e,i)->{
                            System.out.println("Error For Item +" + i );
                        })
                        ;
                StepVerifier
                        .create(fluxFromJust)
                        .expectNext(2, 4,6,8,10)
                        .expectNext(12)
                        .verifyComplete();
        }
Run Code Online (Sandbox Code Playgroud)

Mic*_*rry 13

onErrorContinue()可能没有按照您的想法行事 - 如果上游操作员碰巧支持这样做,它可以让上游操作员从可能发生的错误中恢复。这是一个相当专业的操作员。

在这种情况下map(),实际上确实支持onErrorContinue,但 map 实际上并没有产生错误 - 错误已经插入到流中(通过concat()显式Flux.error()调用。)换句话说,根本没有运算符产生错误,所以因此它没有什么可以恢复的,因为提供的元素是错误的。

如果您更改了流,从而map()实际上导致了错误,那么它会按预期工作:

Flux.just(1, 2,3,4,5)
        .map(x -> {
            if(x==5) {
                throw new RuntimeException();
            }
            return x*2;
        })
        .onErrorContinue((e,i)->{
            System.out.println("Error For Item +" + i );
        })
        .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

产生:

2
4
6
8
Error For Item +5
Run Code Online (Sandbox Code Playgroud)

根据实际用例的替代方法可能是onErrorResume()在可能有错误的元素(或元素源)之后使用:

Flux.just(1, 2, 3, 4, 5)
        .concatWith(Flux.error(new RuntimeException()))
        .onErrorResume(e -> {
            System.out.println("Error " + e + ", ignoring");
            return Mono.empty();
        })
        .concatWith(Flux.just(6))
        .map(i -> i * 2)
        .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

通常,使用另一个“onError”运算符(例如onErrorResume())通常是更常见且更推荐的方法,因为onErrorContinue()它依赖于运算符支持并影响上游,而不是下游运算符(这是不寻常的)。