Reactor onErrorContinue 运算符是否让原始序列继续?

ank*_*sh 5 java project-reactor reactor-kafka

Reactor 错误处理文档 ( https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling ) 指出错误处理运算符不会让原始序列继续。

在了解错误处理运算符之前,您必须记住,反应序列中的任何错误都是终止事件。即使使用错误处理运算符,它也不会让原始序列继续。相反,它将 onError 信号转换为新序列(后备序列)的开始。换句话说,它替换了其上游终止的序列。

但 onErrorContinue 的 javadoc 声明如下(https://projectreactor.io/docs/core/3.4.10/api/index.html) -

通过从序列中删除有罪的元素并继续处理后续元素,让上游兼容运算符从错误中恢复。

onErrorContinue 不被视为“错误处理运算符”吗?

它似乎确实允许原始序列继续 -

        Flux.range(1, 5)
                .map(i -> {
                    if (i == 3) {
                        throw new RuntimeException("Forcing exception for " + i);
                    }
                    return i;
                })
                .doOnNext(i -> System.out.println(i))
                .onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
                .subscribe();
Run Code Online (Sandbox Code Playgroud)

结果(删除了 3 个但继续后续元素)

1
2
4
5
Error while processing 3 - Forcing exception for 3

Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)

该文档确实指出 onErrorContinue 依赖于操作员支持。有没有其他方法可以让原始序列(源 Flux)继续适用于所有操作员?我不想在出现错误时用备用通量来替换我的源通量(onErrorResume 行为) - 我只想忽略问题元素并继续使用源通量。

编辑1(我的用例)

我有一个反应堆卡夫卡源通量&我想无限地消耗它,无论错误如何。我正在使用 onErrorContinue,但根据这篇文章收到的反馈,我已将其替换为 onErrorResume。下面是我此时的代码,但我不确定它是否在所有情况下都有效(通过“工作”,我从 kafka 连续流式传输,无论有任何错误)。请问有什么建议吗?

        KafkaReceiver.create(receiverOptions)
                .receive()
                .flatMap(record -> processRequest(record.value())
                        .doOnNext(e -> record.receiverOffset().acknowledge())
                        .doOnError(e -> {
                            System.err.println("Error occurred for msg: " + record.value() + ", Error " + e);
                            record.receiverOffset().acknowledge();
                        })
                        .onErrorResume(e -> Mono.empty()))
                .repeat(() -> true)
                .retryWhen(Retry.indefinitely())
                .doFinally(signalType -> {
                    //dont expect control to ever reach here
                    System.err.println("KafkaReceiverFlux terminating with Signal type: " + signalType);
                })
                .subscribe();
Run Code Online (Sandbox Code Playgroud)

Mic*_*rry 4

reactor 遵循的反应式流规范规定流中的所有错误都是终端事件 - 这就是reactor 错误处理文档的基础。为了处理错误,必须发生错误,并且根据规范,该错误必须是最终错误。在所有符合规范的情况下(几乎是所有情况)都是如此。

onErrorContinue()然而,是一种相当特殊的运算符。它是一个错误处理运算符,但它通过允许删除错误并继续流来破坏响应式规范。当您希望连续处理且永不停止且带有错误侧通道的情况下,它可能很有用。

话虽这么说,它有很多问题 - 不仅需要特定的操作员支持(因为完全符合反应流规范的操作员可能完全忽视,onErrorContinue()同时仍然保持合规),而且还有一大堆其他问题。如果您对一些背景阅读感兴趣,我们中的一些人在这里讨论了这些内容。将来它可能会被转移到一个unsafe()分组或类似的组中,但这是一个非常难以解决的问题。

话虽如此,核心建议是目前 Javadoc中的建议,除了非常具体的情况外,不要在所有情况下使用onErrorContinue(),而是onErrorResume()在每个单独的发布者上使用,如下所示:

//Stream
.flatMap(id -> repository.retrieveById(id)
      .doOnError(System.err::println)
      .onErrorResume(e -> Mono.empty()))
Run Code Online (Sandbox Code Playgroud)

这会带来更大的冗长性,并且可能会带来较小的性能损失(我没有验证过),但其优点是其行为更加清晰,不会破坏反应流规范,并且不需要特定的操作员支持即可工作。这就是我在几乎所有情况下都会推荐的——我个人觉得onErrorContinue()在大多数情况下其微妙之处太复杂而难以推理。