Ali*_*der 5 spring-integration apache-kafka
我正在使用spring-integration-kafka.
使用了此示例中的代码。
我试图实现的是,当抛出异常时,不应将确认发送回 Kafka(即不应执行偏移提交),因此下一个fromKafka.receive(10000)方法调用将返回与前一个方法相同的消息。
但是我遇到了一个问题:即使确认没有发送到 Kafka,消费者也知道下一条消息的偏移量并继续读取新消息,尽管偏移主题中的偏移值保持不变。
如果出现某些故障,如何让消费者重读消息?
目前不支持重新获取失败的消息
您可以做的一件事是在消息驱动适配器的下游添加重试(例如使用请求处理程序重试建议)。
如果不确认,消息将在重新启动后传递,但不会在当前实例化期间传递。
由于消息被预取到适配器中,因此您可以做的一件事就是检测故障,停止适配器,耗尽预取的消息并重新启动。
您可以注入一个自定义ErrorHandler来停止适配器并向下游流发出信号,指示它应该忽略耗尽消息。
编辑
现在有一个SeekToCurrentErrorHandler。
| 归档时间: |
|
| 查看次数: |
2604 次 |
| 最近记录: |