Raj*_*Raj 4 apache-kafka kafka-consumer-api
我试图了解以下两个融合消费者配置的默认值如何协同工作。
max.poll.interval.ms - 根据汇合文档,默认值为 300,000 毫秒
session.timeout.ms - 根据汇合文档,默认值为 10,000 毫秒
heartbeat.interval.ms - 根据汇合文档,默认值为 3,000 毫秒
假设我在配置中使用这些默认值。现在我有一个问题。
例如,假设消费者每 3,000 毫秒发送一次心跳,我的第一次轮询发生在时间戳 t1,然后第二次轮询发生在 t1 + 20,00 毫秒。那么它会因为超过“session.timeout.ms”而导致重新平衡吗?或者当消费者确实按照预期时间戳发送心跳时,它会正常工作吗?
在上一篇线程中,这里还解释了会话超时和最大轮询超时。我也解释一下我对此的理解。
ConsumerRecords poll(final long timeout): 用于从主题的分区中从最后消费的偏移量或手动设置的偏移量开始顺序获取数据。如果有可用记录,这将立即返回,否则它将等待超时。如果超时将返回空记录。轮询 API 不断调用以获取到达的任何新消息并确保消费者的活跃性。
session.timeout.ms在每次轮询期间,消费者协调器向代理发送心跳,以确保消费者的会话处于活动状态。如果代理在 session.timeout.ms 之前没有收到任何心跳,则代理将离开该消费者并进行重新平衡
您可以假设 session.timeout.ms 是代理等待从消费者那里获取心跳的最长时间,而 heartbeat.interval.ms 是消费者假设向代理发送心跳的预期时间。这解释了 heartbeat.interval.ms 总是小于 session.timeout.ms 因为理想情况是会话超时的 1/3。
max.poll.interval.ms:使用消费者组管理时调用 poll() 之间的最大延迟。这意味着消费者在获取更多记录之前将处于空闲最大时间。如果在此超时到期之前未调用 poll() ,则消费者被视为失败,并且组将通过调用 poll 来重新平衡,以便将分区重新分配给另一个消费者实例。如果我们正在进行长批处理,增加 max.poll.interval.ms 是有好处的,但请注意,增加此值可能会延迟组重新平衡,因为消费者只会在轮询调用中加入重新平衡。我们可以通过调整 max.poll.records 来保持较低的最大轮询间隔。
现在让我们讨论一下它们之间的关系。
消费者在调用 poll 时检查心跳,会话超时 poll 超时在后台如下方式:
轮询和重新平衡完成后,协调器检查会话超时,如果会话超时已过期而没有看到成功的心跳,旧协调器将断开连接,因此下一次轮询将尝试重新平衡。因此,会话超时直接依赖于时间协调器的活跃性,如果会话超时消费者协调器本身死亡并且调用 poll 将必须在重新平衡之前分配新的协调器。
会话超时后检查协调器验证heartbeat.pollTimeoutExpired,这意味着前台线程在调用 poll() 之间已停止,因此成员显式离开组并调用 poll 来加入新消费者而不是整个消费者组协调器。
正如共享链接中提到的,轮询与心跳无关,因此在轮询期间,如果轮询相当大,心跳仍然允许发送心跳,这确保您的线程处于活动状态,这意味着会话超时不会直接链接到 poll 。
session.timeout.ms:接收心跳的最长时间
max.poll.interval.ms:独立处理线程上的最大时间
因此,如果您将 max.poll.interval.ms 设置为 300,000,那么到下一次轮询将有 300,000 毫秒,这意味着消费者线程最多有 300,000 毫秒来完成处理。在heartbeat之间,heartbeat将继续在heartbeat.interval.ms(即3,000)发送心跳请求,以指示线程仍然处于活动状态,如果没有心跳,直到session.timeout.ms(即10,000协调器将死亡)并调用poll来重新分配新的协调器并重新平衡
| 归档时间: |
|
| 查看次数: |
8356 次 |
| 最近记录: |