Car*_*tor 7 spring apache-kafka spring-kafka
我们目前基本上通过以下简化机制确认消息:
@KafkaListener(topics = "someTopic")
public void listen(final String message, final Acknowledgment ack) {
try {
processMessage(message);
ack.acknowledge();
} catch (final IOException e) {
// do not acknowledge here since we can temporarily not process the message
}
Run Code Online (Sandbox Code Playgroud)
基本上,每当我们暂时无法处理消息时(在IOExceptions的情况下),我们希望以后再次接收它.
但这不起作用,因为确认假定同一分区中的所有先前消息都已成功处理.在我们的IOException情况下,将跳过失败的消息,但可能会被同一分区上具有更高索引的其他消息确认.
我们有一些想法如何解决这个问题,但这将意味着一些讨厌的解决方法,以避免直接在KafkaListener方法中调用确认.我们的用例是非常具体的,还是更像是春天kafka用户会假设的"默认"行为?
是否有针对此类问题的spring-kafka解决方案?或者你有想法"正确"解决这个问题吗?
这就是卡夫卡的工作方式;您可以启用重试,以在继续处理下一条消息之前尝试克服 IOException。
您可以配置错误句柄以将失败消息发布到另一个主题以便稍后重播。或者,错误处理程序可以停止容器以阻止任何新的交付。
当容器重新启动时,将重播该消息。
归档时间: |
|
查看次数: |
1615 次 |
最近记录: |