Kafka 弹性 - 组协调员

Ant*_*Kim 5 java spring apache-kafka

据我了解,其中一位经纪人被选为负责消费者再平衡的组协调员。

Discovered coordinator host:9092 (id: 2147483646 rack: null) for group good_group
Run Code Online (Sandbox Code Playgroud)

我有 3 个节点,复制因子为 3 和 3 个分区。一切都很好,当我在非协调器节点上杀死 kafka 时,消费者仍在接收消息。

但是当我用协调器杀死那个特定的节点时,重新平衡没有发生,我的 Java 消费者应用程序没有收到任何消息。

2018-05-29 16:34:22.668 INFO  AbstractCoordinator:555 - Discovered coordinator host:9092 (id: 2147483646 rack: null) for group good_group.
2018-05-29 16:34:22.689 INFO  AbstractCoordinator:600 - Marking the coordinator host:9092 (id: 2147483646 rack: null) dead for group good_group
2018-05-29 16:34:22.801 INFO  AbstractCoordinator:555 - Discovered coordinator host:9092 (id: 2147483646 rack: null) for group good_group.
2018-05-29 16:34:22.832 INFO  AbstractCoordinator:600 - Marking the coordinator host:9092 (id: 2147483646 rack: null) dead for group good_group
2018-05-29 16:34:22.933 INFO  AbstractCoordinator:555 - Discovered coordinator host:9092 (id: 2147483646 rack: null) for group good_group.
2018-05-29 16:34:23.044 WARN  ConsumerCoordinator:535 - Auto offset commit failed for group good_group: Offset commit failed with a retriable exception. You should retry committing offsets. 
Run Code Online (Sandbox Code Playgroud)

我做错了什么,有没有办法解决这个问题?

Qua*_*ien 5

但是,当我使用协调器杀死该特定节点时,不会发生重新平衡,并且我的 java 消费者应用程序不会收到任何消息。

组协调器接收来自消费者组中所有消费者的心跳。它维护一个活跃消费者列表,并根据该列表的变化启动重新平衡。然后组长执行重新平衡活动。

这就是为什么如果你杀死组协调员,重新平衡就会停止。

更新

如果组协调器代理关闭,Zookeeper 将收到通知,并开始选举自动从活动代理中提升新的组协调器。所以与小组协调员无关。我们看一下日志:

2018-05-29 16:34:23.044 WARN  ConsumerCoordinator:535 - Auto offset commit failed for group good_group: Offset commit failed with a retriable exception. You should retry committing offsets.
Run Code Online (Sandbox Code Playgroud)

内部主题__consumer_offset的复制因子可能具有默认值1。您可以检查一下server.properties文件中default.replication.factoroffsets.topic.replication.factor的值是什么。如果默认值为1,则应将其更改为更大的值。如果不这样做,组协调器将关闭,导致偏移管理器停止而没有备份。因此提交偏移量的活动无法进行。