消费者“group_name”组正在永远重新平衡

swa*_*il7 6 apache-kafka kafka-consumer-api rebalancing spring-kafka

我使用的是卡夫卡:2.11-1.0.1。该应用程序包含主题“X”的并发度为 5 且分区为 5 的使用者。

当应用程序重新启动并且在分区分配之前在主题“X”上发布消息时,主题“X”的 5 个消费者找到组协调器并向组协调器发送加入组请求。预计会得到团体协调员的答复,但没有收到任何答复。

我已检查 Kafka 服务器日志,但找不到 DEBUG 日志级别的相关日志。

当我运行描述消费者组命令时,出现以下观察结果:

  1. 消费群体正在重新平衡
  2. 老消费者,有一定滞后性
  3. 具有一些随机名称的新消费者。随着时间的推移,新的消费者数量不断增加。

新消息发布在主题“X”上,但消费者没有收到。

heartbeat 和 session.time.out 设置为默认值。

如果消息在主题“X”及其使用者的分区分配之前发布,则会出现此问题。

我的疑问是:为什么重新平衡没有完成以便新的消费者开始消费新生成的消息?

swa*_*il7 7

应用程序在消费者组中具有以下消费者

  • 消费者A收听Topic1。Topic1 有 1 个分区。该消费者的 max.poll.interval.time.ms=4 小时。
  • 消费者B收听Topic2。Topic2有5个分区。消费者B并发数=5。max.poll.interval.time.ms=该消费者 1 小时。

应用程序重新启动时发生了什么以及主题之一是否已发布消息

  • 当应用程序重新启动时,会创建一个消费者实例 (consumerA1),并订阅 topic1。ConsumerA1找到组坐标(GC)并发送加入组请求。
  • ConsumerA1收到GC的响应并成为leader。直到这一步还没有其他consumer初始化。
  • ConsumerA1分配分区并向GC发送SyncGroup请求。新的任务生成发生。这样第一次rebalance就完成了。
  • topic1 上的消息已经发布,consumerA1 获取该消息并开始处理。处理完成此消息需要很长时间。(说2小时)
  • 现在5个consumer实例一一初始化,并且都订阅了topic2。这些消费者发现GC并发送加入组请求。但GC没有回应他们。
  • 当consumerA1向GC发送心跳时,GC响应正在进行重新平衡,但consumerA1并没有撤销分区,因为它正在处理消息。
  • 根据重新平衡协议(关于重新平衡的好文章),GC 会等待所有消费者发送加入组请求。在这种情况下,GC等待来自consumerA1的加入组请求。最长等待时间为 max.poll.interval.time.ms,即本例中的 4 小时。

根本原因:

应用程序重新启动后,组协调器没有等待所有消费者初始化,因此首先发生了不必要的重新平衡,因此消费者A1从分区中获取消息并开始处理它。

解决方案: 为了避免这种不必要的初始重新平衡,kafka提供了一种配置,其中组协调器等待消费者加入新的消费者组。文档

group.initial.rebalance.delay.ms

检查了我的 kafka server.properties ,它被设置为 0。尝试使用默认值,即 3 秒。避免了初始重新平衡,GC 在应用程序重新启动时等待 3 秒,此时所有其他消费者都已初始化。所有消费者都发送了加入组请求,因为所有 GC 都收到了来自所有消费者的请求。GC 没有任何延迟地响应,重新平衡进行并成功完成。