use*_*803 6 apache-kafka spring-boot project-reactor reactor-kafka
我正在使用反应堆卡夫卡发送卡夫卡消息并接收和处理它们。在接收 kakfa 有效负载时,我会进行一些反序列化,如果出现异常,我只想记录该有效负载(通过保存到 mongo ),然后继续接收其他有效负载。
为此,我使用以下方法 -
@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
flux.delayUntil(//some function to do something)
.doOnNext(r -> r.receiverOffset().acknowledge())
.onErrorResume(this::handleException()) // here I'll just save to mongo
.subscribe();
}
}
private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
// save to mongo
return Flux.empty();
}
Run Code Online (Sandbox Code Playgroud)
在这里,我希望每当我在接收有效负载时遇到异常时, onErrorResume 应该捕获它并记录到 mongo ,然后当我发送到 kafka 队列时我应该很好地继续接收更多消息。但是,我看到异常发生后,即使调用了 onErrorResume 方法,但我无法再处理发送到 Kakfa 主题的消息。我在这里可能缺少什么吗?
正如@bsideup也提到的,我最终没有从反序列化器抛出异常,因为kafka无法提交该记录的偏移量,并且没有干净的方法来忽略该记录并继续进一步消耗记录因为我们没有记录的偏移量信息(因为它格式错误)。因此,即使我尝试使用反应性错误运算符忽略记录,轮询也会获取相同的记录,然后消费者就会陷入困境
归档时间: |
|
查看次数: |
1955 次 |
最近记录: |