Kubernetes 上的 Kafka Streams:重新部署后的长期重新平衡

ped*_*sen 5 java apache-kafka kubernetes apache-kafka-streams

问题

我们使用 StatefulSet 在 Kubernetes 上部署 Scala Kafka Streams 应用程序。这些实例具有单独的applicationIds,因此它们每个都会复制完整的输入主题以实现容错。它们本质上是只读服务,仅读取状态主题并将其写入状态存储,并通过 REST 处理客户请求。这意味着,在任何给定时间,消费者组始终仅包含一个Kafka Streams 实例

现在我们的问题是,当触发滚动重启时,每个实例的启动时间大约需要5分钟,其中大部分时间都花在等待状态上REBALANCING我在这里读到,Kafka Streams 不会发送请求LeaveGroup以便在容器重新启动后快速返回,而无需重新平衡。为什么这对我们不起作用,为什么重新平衡需要这么长时间,即使是applicationId相同的?理想情况下,为了最大限度地减少停机时间,应用程序应立即从重新启动时离开的位置接管。

配置

以下是我们对默认值进行更改的一些配置:

properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), "300000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest")
// RocksDB config, see https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, classOf[BoundedMemoryRocksDBConfig])    
Run Code Online (Sandbox Code Playgroud)

问题/相关配置

  • 有助于减少session.timeout.ms吗?我们将其设置为相当大的值,因为 Kafka 代理位于不同的数据中心,并且网络连接有时不是超级可靠。
  • 这个答案建议减少max.poll.interval.ms,因为它与重新平衡超时有关。那是对的吗?我犹豫是否要更改此设置,因为它可能会对我们应用程序的正常操作模式产生影响。
  • 有人提到在部署期间延迟重新平衡的配置group.initial.rebalance.delay.ms- 但这也会在从崩溃中恢复后导致延迟,不是吗?
  • 我还偶然发现了KIP-345,它的目标是完全通过 消除消费者对静态会员资格的重新平衡group.instance.id,这非常适合我们的用户案例,但它似乎尚未在我们的经纪商上提供。

我对大量的配置以及如何使用它们来实现更新后的快速恢复感到困惑。谁能解释一下他们是怎么一起玩的?

Mat*_*Sax 5

您引用的另一个问题并没有说重新启动时可以避免重新平衡。不发送LeaveGroupRequest仅可以避免停止应用程序时的重新平衡。因此,重新平衡的次数从两次减少到一次。当然,对于您有些不寻常的单实例部署,您在这里不会获得任何东西(事实上,它可能实际上“伤害”您......)a

它有助于减少 session.timeout.ms 吗?我们将其设置为相当大的值,因为 Kafka 代理位于不同的数据中心,并且网络连接有时不是超级可靠。

可能会,具体取决于您重新启动应用程序的速度。(更多详细信息如下。)也许只是尝试一下(即,将其设置为 3 分钟,仍然具有较高的稳定性值,并看到重新平衡时间下降到 3 分钟?

这个答案建议减少 max.poll.interval.ms,因为它与重新平衡超时有关。那是对的吗?我犹豫是否要更改此设置,因为它可能会对我们应用程序的正常操作模式产生影响。

max.poll.interval.ms也会影响重新平衡时间(更多详细信息如下)。但是,默认值为 30 秒,因此不应导致 5 分钟的重新平衡时间。

提到了配置 group.initial.rebalance.delay.ms 来延迟部署期间的重新平衡 - 但这也会在从崩溃中恢复后导致延迟,不是吗?

这仅适用于空消费者组,默认值仅为 3 秒。所以它不应该影响你。

我还偶然发现了 KIP-345,它的目标是完全通过 group.instance.id 消除消费者对静态成员资格的重新平衡,这非常适合我们的用户案例,但它似乎尚未在我们的经纪商上提供。

使用静态组成员资格实际上可能是最好的选择。也许值得升级您的经纪商以获得此功能。

session.timeout.ms顺便说一句,和之间的区别max.poll.interval.ms在另一个问题中进行了解释:Difference between session.timeout.ms and max.poll.interval.ms for Kafka 0.10.0.0 and later versions

一般来说,经纪人端组协调器维护每个“组生成”的所有成员的列表。如果成员主动离开群组(通过发送LeaveGroupRequest)、超时(通过session.timeout.msmax.poll.interval.ms)或新成员加入群组,则会触发重新平衡。如果发生重新平衡,每个成员都有机会重新加入该组以纳入下一代。

对于您的情况,该组只有一名成员。当您停止应用程序时,不会LeaveGroupRequest发送任何消息,因此组协调员只有在session.timeout.ms通过后才会删除该成员。

如果您重新启动应用程序,它将作为“新”成员返回(从组协调员的角度来看)。这将触发重新平衡,为该组的所有成员提供更改以重新加入该组。对于您的情况,“旧”实例可能仍在组中,因此只有在组协调员从组中删除旧成员后,重新平衡才会继续进行。问题可能是,组协调员认为组从一个成员扩展到两个成员......(这就是我上面的意思:如果LeaveGroupRequest发送 a,当您停止应用程序时,组将变为空,并且重新启动时,只有新成员才会加入该组,并且重新平衡将立即进行。)

使用静态组成员身份可以避免该问题,因为在重新启动时,实例可以重新标识为“旧”实例,并且组协调器不需要等待旧组成员过期。