"重新平衡"在Apache Kafka上下文中意味着什么?

Jef*_*ong 39 apache-kafka kafka-consumer-api

我是Kafka的新用户,现在已经试用了大约2-3周.我相信目前我已经很好地理解了Kafka在大多数情况下的工作原理,但是在尝试为我自己的Kafka消费者设计API之后(这是模糊不清的但是我遵循了新的KafkaConsumer应该遵循的准则可用于v 0.9,它出现在'trunk'repo atm上)如果我有多个具有相同groupID的消费者,我就会从主题中消耗延迟问题.

在此设置中,我的控制台始终记录有关"重新平衡触发"的问题.当我向消费者群体添加新的消费者时,是否会发生重新平衡,并且为了找出同一个群组ID中的哪个消费者实例将获得哪些分区或完全用于其他内容的重新平衡而触发它们?

我也从https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design中看到了这段话,我似乎无法理解它,所以如果有人能帮助我做感觉非常感激:

重新平衡是一组消费者实例(属于同一组)协调以拥有该组订阅的互斥主题分区集的过程.在成功完成消费者组的重新平衡操作结束时,所有订阅主题的每个分区都将由该组中的单个消费者实例拥有.重新平衡的工作方式如下.每个经纪人都被选为消费者群体子集的协调者.组的协调代理负责协调有关订阅主题的使用者组成员身份更改或分区更改的重新平衡操作.它还负责将生成的分区所有权配置传递给正在进行重新平衡操作的组的所有使用者.

Geo*_*vis 45

当新的消费者加入消费者群体时,该组消费者尝试"重新平衡"负载以将分区分配给每个消费者.如果在进行此分配时消费者集合发生更改,则重新平衡将失败并重试.此设置控制放弃前的最大尝试次数.

对此的命令是:rebalance.max.retries,默认设置为4.

此外,如果以下情况属实,可能会发生这种情况:

ZooKeeper会话超时.如果消费者在这段时间内没有心跳到ZooKeeper,那么它被认为已经死亡并且将发生重新平衡.

希望这可以帮助!

  • 另一个潜在问题是在同一个使用者连接器中使用多个主题时.在内部,每个主题都有一个内存中队列,它为消费者迭代器提供信息.每个代理都有一个提取器线程,它为所有主题发出多次获取请求.fetcher线程迭代所获取的数据并尝试将不同主题的数据放入其自己的内存中队列中.如果其中一个消费者很慢,最终其相应的内存中队列将被填满.因此,fetcher线程将阻止将数据放入该队列. (6认同)
  • 在该队列具有更多空间之前,不会将任何数据放入队列中以用于其他主题.因此,那些其他主题,即使它们的数量较少,其消费也会因此而延迟.要解决此问题,要么确保所有消费者都能跟上,要么使用单独的消费者连接器来处理不同的主题.对不起,这是一个很长的回复,由于某种原因不得不将它堆叠在三个线程....希望有所帮助! (6认同)
  • 杰夫,我很高兴!我认为这个问题可能正在发生,因为主题分区是在同一个消费者群体中的消费者之间分发消息的最小单位.因此,如果消费者数量大于Kafka集群中的分区总数(跨所有代理),则某些消费者将永远不会获得任何数据.解决方案是增加代理上的分区数. (3认同)

vor*_*lex 31

消费者组中的每个消费者都被分配一个或多个主题分区,而Rebalance是消费者之间的分区所有权的重新分配.

一个再平衡发生在:

  • 消费者加入该组
  • 消费者干净利落地走下去
  • 组协调员认为消费者是DEAD.这可能发生在崩溃之后或消费者忙于长时间运行的处理时,这意味着消费者在配置的会话间隔内没有同时向组协调器发送心跳
  • 添加了新分区


作为集团协调员(集群中的经纪人之一)和集团领导者(加入集团的第一个消费者),为消费者群体指定, Rebalance可以或多或少地描述如下:

  • 领导者从组协调器接收组中所有消费者的列表 (这将包括最近发送心跳并且因此被认为是活动的所有消费者)并且负责为每个消费者分配分区的子集.
  • 在决定分区分配(Kafka有几个内置分区分配策略)后,组长将分配列表发送给组协调器,组协调器将此信息发送给所有使用者.


这适用于Kafka 0.9,但我很确定新版本仍然有效.


sun*_*007 11

消费者重新平衡决定哪个消费者负责某些主题的所有可用分区的哪个子集。例如,您可能有一个包含 20 个分区和 10 个消费者的主题;在重新平衡结束时,您可能希望每个使用者都从 2 个分区中读取数据。如果您关闭其中 10 个使用者,您可能希望在重新平衡完成后每个使用者都有 1 个分区。消费者重新平衡是一种动态分区分配,可以由 Kafka 自动处理。

组协调器是负责与消费者通信以实现消费者之间重新平衡的代理之一。在早期版本中,Zookeeper 存储元数据详细信息,但最新版本则存储在代理上。消费者协调器接收来自消费者组的所有消费者的心跳和轮询,因此要了解每个消费者的心跳并管理他们在分区上的偏移量。

组长: 消费者组中的一个作为组长,由组协调器选择,负责代表组中的所有消费者做出分区分配决策。

再平衡场景:

  1. 消费者组订阅任何主题

  2. 消费者实例无法发送具有 session.heart.beat 时间间隔的心跳。

  3. 消费者长进程超过轮询超时

  4. 消费者组的消费者通过异常

  5. 添加了新分区。

  6. 扩大和缩小消费者。添加新的消费者或手动删除现有的消费者

消费者再平衡

当消费者请求加入组或离开组时启动消费者重新平衡。Group Leader 从 Group Coordinator 收到所有活跃消费者的列表。组长使用 PartitionAssigner 决定分配给每个消费者的分区。一旦 Group Leader 完成分区分配,它会将分配列表发送给 Group Coordinator,后者将此信息发送回所有消费者。Group 只将适用的分区发送给他们的消费者,而不是其他消费者分配的分区。只有组长知道所有消费者及其分配的分区。重新平衡完成后,消费者开始向 Group Coordinator 发送 Heartbeat,表明它处于活动状态。消费者向组协调器发送 OffsetFetch 请求,以获取其分配的分区的最后提交的偏移量。

状态管理

在重新平衡时,组协调器将其状态设置为重新平衡并等待所有消费者重新加入组。

当组开始重新平衡时,组协调器首先将其状态切换为重新平衡,以便通知所有交互的消费者重新加入组。一旦重新平衡完成组协调器创建新的代 ID 并通知所有消费者和组继续同步阶段,消费者发送同步请求并等待组领导完成新分配分区的生成。一旦消费者收到新分配的分区,他们就会进入稳定阶段。

在此处输入图片说明

静态成员资格

这种重新平衡是一项非常繁重的操作,因为它需要停止所有消费者并等待获得新分配的分区。在每次重新平衡时总是创建新的代 ID 意味着刷新所有内容。为了解决这个开销,Kafka 2.3+ 引入了静态成员资格来减少不必要的重新平衡。KIP-345

在静态成员身份中,消费者状态将持续存在,而在重新平衡时,将应用相同的分配。它使用一个新的 group.instance.id 来保持成员身份。因此,即使在最坏的情况下,成员 id 会重新洗牌以分配新分区,但仍然相同的消费者实例 ID 将获得相同的分区分配

instanceId: A, memberId: 1, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}
Run Code Online (Sandbox Code Playgroud)

并在重新启动后:

instanceId: A, memberId: 4, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}   
Run Code Online (Sandbox Code Playgroud)

参考:

  1. https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership

  2. https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances