在kafka消费者中重试逻辑

TEC*_*007 6 apache-kafka kafka-consumer-api kafka-producer-api

我有一个用例,我从队列中消耗某些日志并使用该日志中的一些信息命中某些第三方API,以防第三方系统没有正确响应我希望为该特定日志实现重试逻辑.

我可以添加一个时间字段并将消息重新发送到同一队列,如果其时间字段有效(即小于当前时间),则此消息将再次消耗,如果不是,则再次将其推送到队列中.

但是这个逻辑会一次又一次地添加相同的日志,直到重试时间正确并且队列将不必要地增长.

是否有更好的方法在Kafka中实现重试逻辑?

Pav*_*ave 8

您可以创建多个重试主题并在那里推送失败的任务。例如,您可以创建 3 个具有不同延迟(以分钟为单位)的主题,并轮换单个失败的任务,直到达到最大尝试限制。

'retry_5m_topic'?—? 5 分钟后重试

'retry_30m_topic'?—? 30 分钟后重试

'retry_1h_topic'?—? 1 小时后重试

查看更多详情:https : //blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a


小智 1

在消费者中,如果它抛出异常,则会生成另一条尝试编号为1的消息。因此,下次使用它时,它具有尝试编号为1的属性。在生产者中处理它,如果它尝试的次数超过了您的重试次数,然后停止生产。