适用于Kafka 0.10.0.0及更高版本的session.timeout.ms和max.poll.interval.ms之间的差异

Dee*_*eps 36 apache-kafka kafka-consumer-api

我不清楚为什么我们需要session.timeout.ms和max.poll.interval.ms以及何时使用一个或另一个或两者?似乎两者都表明时间协调器的上限将等待消费者获得心跳,然后再将其假死.

另外,对于基于KIP-62的 0.10.1.0+版本,它的表现如何?

Mat*_*Sax 97

在KIP-62之前,只有session.timeout.ms(即Kafka 0.10.0和更早版本).max.poll.interval.ms通过KIP-62(卡夫卡的一部分0.10.1)介绍.

KIP-62,poll()通过背景心跳线程将心跳与呼叫分离.这允许poll()比心跳间隔更长的处理时间(即,两次连续之间的时间).

假设处理消息需要1分钟.如果耦合了心跳和轮询(即,在KIP-62之前),则需要设置session.timeout.ms大于1分钟以防止消费者超时.但是,如果消费者死亡,检测失败的消费者也需要超过1分钟.

KIP-62将轮询和心跳分离,允许在两次连续的民意调查中发送心跳.现在你有两个线程在运行,心跳线程和处理线程,因此,KIP-62为每个线程引入了一个超时.session.timeout.ms是用于max.poll.interval.ms处理线程的心跳线程.

session.timeout.ms=30000因此,假设您设置了消费者心跳线程必须在此时间到期之前向代理发送心跳.另一方面,如果处理单个消息需要1分钟,则可以设置max.poll.interval.ms大于一分钟的时间,以便为处理线程提供更多时间来处理消息.

如果处理线程死亡,则需要max.poll.interval.ms检测到这一点.但是,如果整个消费者死亡(并且一个垂死的处理线程很可能崩溃整个消费者,包括心跳线程),它只需要session.timeout.ms检测它.

这个想法是,即使处理本身需要很长时间,也可以快速检测出失败的消费者.

  • 假设您的消费者死亡(或者存在一个无限循环的错误),但是后台线程保持着心跳。对于这种情况,不会有任何进展,但是不会被发现。因此,“ max.poll.interval.ms”是对主处理线程的健康检查-具有两个配置,可让您快速检测“硬故障”(心跳和主线程死亡),并长期简化代码处理(使用单个配置,您将有很长的滞留时间或复杂的代码来在“手动”处理期间触发心跳) (2认同)