Dee*_*eps 3 kafka-consumer-api
https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html提到“只要消费者定期发送心跳,就假定它是活着的,并且正在处理消息从它的分区。实际上,轮询消息的行为是导致消费者发送这些心跳的原因。如果消费者停止发送心跳的时间足够长,它的会话将超时,组协调器将认为它已死并触发重新平衡.”
同样https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html指定“代理将通过使用心跳自动检测测试组中的失败进程机制。消费者会定期自动ping集群,让集群知道它是活着的。只要消费者能够做到这一点,它就被认为是活着的,并保留从分配给它的分区中消费的权利。如果它停止心跳的时间比 session.timeout.ms 长,然后它会被认为是死的,它的分区将被分配给另一个进程。”
在我的应用程序中,处理从前一个 poll() 接收的消息可能需要长达数小时才能调用另一个 poll()。注意:我禁用了自动提交,因为我并不总是知道处理所有以前的消息需要多长时间。
a) 这会导致组协调员认为消费者已死亡或不活动吗?
b) 是否有其他方法可以向组协调器发送心跳消息以保持会话处于活动状态?
c) session.timeout.ms 对保持消费者存活/活跃有什么影响吗?
a) 是的,如果您调用poll()
的时间不超过session.timeout.ms
Kafka 认为消费者已死的时间。
b) 作为替代方案,您可以poll()
在处理期间调用(即与处理交错)以触发心跳(并在每次“真实”轮询之前查找)。使用额外的处理线程也是可能的,允许主线程定期轮询以发送心跳。但是,您需要确保检测到处理线程上的故障(正确执行很难)!
c) 您可以增加超时值,但是,这可能不是您想要的,就好像您的使用者失败了,这种失败很晚才被检测到。
你描述的问题其实是已知的,未来消费者的行为可能会发生变化。已经有关于它的讨论。有关更多详细信息,请参阅KIP-62。
更新
由于 Kafka0.10.1
消费者有两个配置参数:max.poll.interval.ms
和session.timeout.ms
。第一个是两次连续轮询之间的最大时间,而第二个是心跳超时。心跳在一个额外的线程中发送,因此与poll()
现在调用分离。因此,增加max.poll.interval.ms
不会产生无法快速检测到整个客户端的故障(无心跳)的负面影响。
归档时间: |
|
查看次数: |
1952 次 |
最近记录: |