ank*_*sh 5 java project-reactor reactor-kafka
Reactor 错误处理文档 ( https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling ) 指出错误处理运算符不会让原始序列继续。
在了解错误处理运算符之前,您必须记住,反应序列中的任何错误都是终止事件。即使使用错误处理运算符,它也不会让原始序列继续。相反,它将 onError 信号转换为新序列(后备序列)的开始。换句话说,它替换了其上游终止的序列。
但 onErrorContinue 的 javadoc 声明如下(https://projectreactor.io/docs/core/3.4.10/api/index.html) -
通过从序列中删除有罪的元素并继续处理后续元素,让上游兼容运算符从错误中恢复。
onErrorContinue 不被视为“错误处理运算符”吗?
它似乎确实允许原始序列继续 -
Flux.range(1, 5)
.map(i -> {
if (i == 3) {
throw new RuntimeException("Forcing exception for " + i);
}
return i;
})
.doOnNext(i -> System.out.println(i))
.onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
.subscribe();
Run Code Online (Sandbox Code Playgroud)
结果(删除了 3 个但继续后续元素)
1
2
4
5
Error while processing 3 - Forcing exception for 3
Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)
该文档确实指出 onErrorContinue 依赖于操作员支持。有没有其他方法可以让原始序列(源 Flux)继续适用于所有操作员?我不想在出现错误时用备用通量来替换我的源通量(onErrorResume 行为) - 我只想忽略问题元素并继续使用源通量。
编辑1(我的用例)
我有一个反应堆卡夫卡源通量&我想无限地消耗它,无论错误如何。我正在使用 onErrorContinue,但根据这篇文章收到的反馈,我已将其替换为 onErrorResume。下面是我此时的代码,但我不确定它是否在所有情况下都有效(通过“工作”,我从 kafka 连续流式传输,无论有任何错误)。请问有什么建议吗?
KafkaReceiver.create(receiverOptions)
.receive()
.flatMap(record -> processRequest(record.value())
.doOnNext(e -> record.receiverOffset().acknowledge())
.doOnError(e -> {
System.err.println("Error occurred for msg: " + record.value() + ", Error " + e);
record.receiverOffset().acknowledge();
})
.onErrorResume(e -> Mono.empty()))
.repeat(() -> true)
.retryWhen(Retry.indefinitely())
.doFinally(signalType -> {
//dont expect control to ever reach here
System.err.println("KafkaReceiverFlux terminating with Signal type: " + signalType);
})
.subscribe();
Run Code Online (Sandbox Code Playgroud)
reactor 遵循的反应式流规范规定流中的所有错误都是终端事件 - 这就是reactor 错误处理文档的基础。为了处理错误,必须发生错误,并且根据规范,该错误必须是最终错误。在所有符合规范的情况下(几乎是所有情况)都是如此。
onErrorContinue()
然而,是一种相当特殊的运算符。它是一个错误处理运算符,但它通过允许删除错误并继续流来破坏响应式规范。当您希望连续处理且永不停止且带有错误侧通道的情况下,它可能很有用。
话虽这么说,它有很多问题 - 不仅需要特定的操作员支持(因为完全符合反应流规范的操作员可能完全忽视,onErrorContinue()
同时仍然保持合规),而且还有一大堆其他问题。如果您对一些背景阅读感兴趣,我们中的一些人在这里讨论了这些内容。将来它可能会被转移到一个unsafe()
分组或类似的组中,但这是一个非常难以解决的问题。
话虽如此,核心建议是目前 Javadoc中的建议,除了非常具体的情况外,不要在所有情况下使用onErrorContinue()
,而是onErrorResume()
在每个单独的发布者上使用,如下所示:
//Stream
.flatMap(id -> repository.retrieveById(id)
.doOnError(System.err::println)
.onErrorResume(e -> Mono.empty()))
Run Code Online (Sandbox Code Playgroud)
这会带来更大的冗长性,并且可能会带来较小的性能损失(我没有验证过),但其优点是其行为更加清晰,不会破坏反应流规范,并且不需要特定的操作员支持即可工作。这就是我在几乎所有情况下都会推荐的——我个人觉得onErrorContinue()
在大多数情况下其微妙之处太复杂而难以推理。