Kafka增加处理超时消息

Mni*_*usr 4 apache-kafka kafka-consumer-api

在kafka消费者中,如果消息的处理时间超过5分钟,则重新处理消息,我已配置消费者增加“max.poll.interval.ms”和session.timeout.ms

max.poll.interval.ms= 7200000 (2 小时) session.timeout.ms= 7200000 (2 小时) request.timeout.ms=7206000 (~2 小时)

这首先起作用,之后,它正在重新平衡我在消费者上遇到了这个错误

2019-10-18 15:29:51.739 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=processgroup] (Re-)joining
Run Code Online (Sandbox Code Playgroud)

团体

sun*_*007 5

你的方法是正确的。您还需要在消费者中 设置session.timeout.ms ,在Kafka代理中设置group.max.session.timeout.ms以避免重新平衡。

订阅一组主题后,消费者将在调用 poll(long) 时自动加入该组。民意调查 API 旨在确保消费者的活跃度。只要继续调用轮询,消费者就会留在组中,并继续从分配给它的分区接收消息。在幕后,消费者会定期向服务器发送心跳。如果消费者崩溃或在session.timeout.ms时间内无法发送心跳,则消费者将被视为死亡,其分区将被重新分配。

max.poll.interval.ms:使用消费者组管理时调用 poll() 之间的最大延迟。如果在此超时到期之前未调用 poll(),则消费者被视为失败,并且组将重新平衡,以便将分区重新分配给另一个成员。

注意:请注意,如果最大轮询增加很多,它将延迟组重新平衡,因为只有在调用轮询时消费者才会重新平衡加入。

request.timeout.ms:该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,则客户端将在必要时重新发送请求,或者在重试次数耗尽时使请求失败。

注意:这不会间接影响重新平衡,但需要设置,因为它必须始终大于 max.poll.interval.ms,否则会引发配置错误。

session.timeout.ms:使用 Kafka 的组管理工具时用于检测消费者故障的超时时间。消费者定期发送心跳以向代理表明其活跃度。如果在此会话超时到期之前代理没有收到心跳,则代理将从组中删除该消费者并启动重新平衡。

注意:但是设置 session.timeout.ms 有点棘手,单独使用它是行不通的,您需要检查 Kafka 代理设置 group.min.session.timeout.ms 也需要增加。

group.max.session.timeout.ms(在Kafka代理中):注册消费者允许的最大会话超时。其默认值为 30000 毫秒。