卡夫卡消费者提交失败异常

Bha*_*ath 4 java apache-kafka kafka-consumer-api

我正在研究 kafka 消费者程序。最近我们将其部署在 PROD 环境中。在那里,我们遇到了如下问题:

[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - No. of records fetched: 1
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Discovered group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null)
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Attempt to heartbeat failed for since member id consumer-otm-opl-group-1-953dfa46-9ced-472f-b24f-36d78c6b940b is not valid.
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch start offset: 9329428
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch Processing Successful.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Failing OffsetCommit request since the consumer is not part of an active group
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:936)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1387)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1349)
    at com.cisco.kafka.consumer.RTRKafkaConsumer.main(RTRKafkaConsumer.java:72)
Run Code Online (Sandbox Code Playgroud)

我的理解是当组协调器不可用并被重新发现时,心跳间隔(根据文档为 3 秒)到期并且消费者被踢出组。这样对吗?。如果是这样,应该如何解决这个问题?如果我错了,请帮助我理解这个问题,并提出你必须解决这个问题的任何想法。如果需要,我可以共享代码。

mik*_*ike 11

您所指的异常

Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
Run Code Online (Sandbox Code Playgroud)

提示正在发生的事情以及可以采取哪些措施来解决问题。在代码中,这个异常被描述为

“当使用 KafkaConsumer#commitSync() 进行偏移提交失败并出现不可恢复的错误时,会引发此异常。当在成功应用提交之前组重新平衡完成时,可能会发生这种情况。在这种情况下,通常无法重试提交,因为某些分区可能已分配给组中的另一个成员。”

根据我的经验,抛出的错误消息可能是由不同的事情引起的,尽管它们都与不再分配给分区的使用者有关:

  1. 在不关闭消费者的情况下创造越来越多的消费者
  2. 投票超时
  3. 心跳超时
  4. 过时的 Kerberos 票证

1. 在不关闭的情况下打开越来越多的消费者

如果您将消费者添加到现有 ConsumerGroup,则会发生重新平衡。因此,必须在使用后关闭消费者或始终使用相同的实例,而不是为每个消息/迭代创建新的 KafkaConsumer 对象。

2. 轮询超时(如错误消息中所述):

[...] 对 poll() 的后续调用之间的时间比配置的要长max.poll.interval.ms,这通常意味着轮询循环花费了太多时间处理消息。

配置max.poll.interval.ms默认为300000ms5minutes。由于您的消费者花费的时间超过这 5 分钟,因此消费者被视为失败,组将重新平衡以将分区重新分配给另一个成员(请参阅消费者配置)。

轮询超时的解决方案:

错误信息中也给出了一个可能的解决方案

您可以通过使用 增加max.poll.interval.ms或减少 poll() 中返回的批次的最大大小来解决此问题max.poll.records

消费者再次读取所有消息,因为(如错误所示)它无法提交偏移量。这意味着,如果您以相同的方式启动消费者,group.id它会认为它从未从该主题中读取任何内容。

3.心跳超时

KafkaConsumer 中有两个主要配置用于处理心跳:heartbeat.interval.mssession.timeout.ms.

在单独的后台线程中,您的 KafkaConsumer 会定期向服务器发送心跳。如果消费者在 session.timeout.ms 时间内崩溃或无法发送心跳,则消费者将被视为死亡,其分区将被重新分配。如果触发了重新平衡,您的使用者将无法从“旧分配”分区提交任何内容,如 CommitFailedException 的描述中所述:“当组重新平衡在成功应用提交之前完成时,可能会发生这种情况。”

心跳超时的解决方法:

增加设置heartbeat.interval.mssession.timeout.ms遵循以下建议:“heartbeat.interval.ms必须设置为低于session.timeout.ms,但通常不应设置为高于该值的 1/3。”

请记住,更改这些值总是需要权衡。你有

  • 更频繁的重新平衡但更短的反应时间来识别死消费者或
  • 不太频繁的重新平衡和更长的反应时间来识别死消费者。

4. 过时的 Kerberos 票证

在我们的生产集群上,我们刚刚在应用程序无法更新 Kerberos 票证后看到 CommitFailedException。