卡夫卡消费者错误:标记协调员死亡

Sha*_*s88 8 apache-kafka

我在Kafka 0.10.0.1集群中有一个包含10个分区的主题.我有一个产生多个消费者线程的应用程序.对于这个主题,我产生了5个线程.在我的应用程序日志中,我多次看到此条目

INFO :: AbstractCoordinator:600 - Marking the coordinator x.x.x.x:9092
(id:2147483646 rack: null) dead for group notifications-consumer
Run Code Online (Sandbox Code Playgroud)

然后有几个条目说(Re-)joining group notifications-consumer. Afterwards我也看到一个警告说

Auto commit failed for group notifications-consumer: Commit cannot be completed since
the group has already rebalanced and assigned the partitions to another member. This means
that the time between subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is spending too much time 
message processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned by poll() with max.poll.records.
Run Code Online (Sandbox Code Playgroud)

现在我已经调整了我的消费者配置

props.put("max.poll.records", 200);
props.put("heartbeat.interval.ms", 20000);
props.put("session.timeout.ms", 60000);
Run Code Online (Sandbox Code Playgroud)

所以,即使在正确调整配置后,我仍然会收到此错误.在重新平衡期间,我们的应用程序完全没有响应.请帮忙.

gab*_*ssi 6

使用session.timeout.ms您仅控制由于心跳引起的超时,这意味着session.timeout.ms自上一次心跳以来已经过去了毫秒,并且群集将您声明为死节点并触发重新平衡。

KIP-62之前,心跳是在轮询中发送的,但现在移至特定的后台线程,以避免在session.timeout.ms呼叫比调用另一个时间多的情况下从集群中退出poll()。将脉动信号分离到特定线程会使处理与告诉集群您已启动并正在运行分离开来,但这会带来“活锁”情况的风险,在这种情况下,进程处于活动状态,但没有进展,因此除了使脉动信号独立之外的的poll一个引入新的超时,以确保消费者还活着,并正在取得进展。该文档说明了有关KIP-62之前的实现的这些信息:

只要使用者发送心跳,它就基本上在分配给它的分区上保持锁定。如果该过程已经无法使用,但仍无法继续发送心跳信号,则该过程将不复存在,则该组中的其他成员将无法接管分区,这将导致延迟增加。但是,心跳和处理都在同一线程中完成,这一事实保证了消费者必须在保持其分配上取得进展。任何影响处理的停顿也会影响心跳。

KIP-62引入的更改包括:

解耦处理超时:我们建议引入一个单独的本地强制超时来进行记录处理,并引入一个后台线程来保持会话处于活动状态,直到该超时到期为止。我们将此新超时称为“进程超时”,并在使用者的配置中将其公开为max.poll.interval.ms。此配置将客户端调用之间的最大延迟设置为poll()

从您发布的日志中,我认为您可能处于这种情况,您的应用需要花费更多的时间max.poll.interval.ms(默认为5分钟)来处理200条轮询记录。如果您处于这种情况,则只能减小max.poll.records或增大max.poll.interval.ms

PD:

max.poll.interval.ms日志中显示的配置(至少)来自kafka 0.10.1.0,因此我认为您在此处犯了一些错误。

更新资料

如果我理解您的意思不对,请纠正我,但是在您的最后一条评论中,您说的是您正在创建25个使用者(例如,org.apache.kafka.clients.consumer.KafkaConsumer如果使用Java,则为25个),并将他们使用N个不同的主题,但使用的主题相同group.id。如果正确,那么每次KafkaConsumer启动或停止a时,您都会看到重新平衡,因为它将发送一个JoinGroupLeaveGroup消息(请参阅相应的kafka协议),其中包含group.idmember.idmember.id不是主机,因此在同一进程中创建的两个使用者仍将具有不同的ID)。请注意,这些消息不包含主题订阅信息(尽管该信息应位于代理中,但kafka不会将其用于重新平衡)。因此,每次集群收到a JoinGroup或aLeaveGroup对于group.idX,它将触发所有具有相同group.id X的消费者的重新平衡。

如果以相同的方式开始使用25个使用者,group.id则将看到重新平衡,直到创建了最后一个使用者,并且相应的重新平衡结束为止(如果继续看到,则可能正在停止使用者)。

几个月前我遇到了这个问题

如果我们有两个使用相同的group.id的KafkaConsumer(在同一进程或两个不同的进程中运行)并且其中一个已关闭,则即使其他KafkaConsumer订阅了不同的主题,也会触发另一个KafkaConsumer的重新平衡。我想经纪人必须只考虑group.id进行重新平衡,而不考虑与LeaveGroupRequest对(group_id,member_id)相对应的预订主题,但是我想知道这是否是预期的行为,或者这应该是正确的有待改善吗?我想这可能是避免代理中更复杂的重新平衡并且考虑到该解决方案非常简单的第一选择,即,即使订阅了不同主题的不同KafkaConsumer,即使它们在同一进程中运行,也可以对其使用不同的组ID。


当发生重新平衡时,我们会看到重复的消息

这是一种特殊的行为,一个使用者使用了该消息,但是在提交偏移量之前触发了重新平衡,并且提交失败。重新平衡完成后,具有该主题分配的过程将再次使用该消息(直到提交成功)。

我分成两组,现在问题从过去2个小时突然消失了。

您在这里遇到了麻烦,但是如果您不想看到任何(不可避免的)重新平衡,则应该group.id对每个主题使用不同的名称。

这是有关不同的重新平衡方案的精彩演讲