项目反应堆:flatMap之后的onErrorResume

G.D*_*rov 0 java reactor project-reactor reactive-streams

Flux.just("a", "b")
        .flatMap(s -> s.equals("a") ? Mono.error(new RuntimeException() : Flux.just(s + "1", s + "2"))
        .onErrorResume(throwable -> Mono.empty())
        .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

你好!

在这里,我对两个元素进行了处理,然后通过flatMap将第一个暴露给异常,第二个暴露给另一个Flux。

随着onErrorResume我期望的输出

b1
b2
Run Code Online (Sandbox Code Playgroud)

但一无所获。有人可以解释为什么会发生吗?

谢谢。

Jan*_*Jan 6

这个问题已经给出了codependent为什么会发生这种情况的可靠答案。要回答一些题外话如何实现预期输出

onErrorResume调用必须移动到flatMap

Flux.just("a", "b")
    .flatMap(s ->
        (s.equals("a") ? Mono.error<RuntimeException>(RuntimeException()) : Flux.just(s + "1", s + "2"))
             .onErrorResume(ex -> Mono.empty())
    )
    .subscribe(System.out::println)
Run Code Online (Sandbox Code Playgroud)

这样输出是预期的

b1
b2
Run Code Online (Sandbox Code Playgroud)


cod*_*ent 5

鉴于这种:

Flux.just("a", "b", "c")
        .flatMap { s ->
            if (s == "b") 
                Mono.error<RuntimeException>(RuntimeException()) 
            else 
                Flux.just(s + "1", s + "2")
        }.onErrorResume { throwable -> Mono.just("d") }.log()
        .subscribe { println(it) }
Run Code Online (Sandbox Code Playgroud)

输出为:

12:35:19.673 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
12:35:19.676 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
12:35:19.677 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a1)
a1
12:35:19.677 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a2)
a2
12:35:19.712 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(d)
d
12:35:19.713 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()
Run Code Online (Sandbox Code Playgroud)

这里发生了什么?onErrorResume() 应用于flatMap()操作员返回的发布者。由于发布者在“ b”上表示失败,因此 flatMap()不再执行,onErrorResume()操作员将继续使用其后备进行发布。

onErrorResume()文档清楚地表明原始发布者由于该错误而完成,并且后备接管了:

在此处输入图片说明