通过拒绝确认重读来自 Kafka 主题的消息

Ali*_*der 5 spring-integration apache-kafka

我正在使用spring-integration-kafka.

使用了此示例中的代码。

我试图实现的是,当抛出异常时,不应将确认发送回 Kafka(即不应执行偏移提交),因此下一个fromKafka.receive(10000)方法调用将返回与前一个方法相同的消息。

但是我遇到了一个问题:即使确认没有发送到 Kafka,消费者也知道下一条消息的偏移量并继续读取新消息,尽管偏移主题中的偏移值保持不变。

如果出现某些故障,如何让消费者重读消息?

Gar*_*ell 2

目前不支持重新获取失败的消息

您可以做的一件事是在消息驱动适配器的下游添加重试(例如使用请求处理程序重试建议)。

如果不确认,消息将在重新启动后传递,但不会在当前实例化期间传递。

由于消息被预取到适配器中,因此您可以做的一件事就是检测故障,停止适配器,耗尽预取的消息并重新启动。

您可以注入一个自定义ErrorHandler来停止适配器并向下游流发出信号,指示它应该忽略耗尽消息。

编辑

现在有一个SeekToCurrentErrorHandler