在 Kafka 客户端(消费者/生产者)宕机后恢复它们

You*_*sef 4 apache-kafka spring-boot spring-kafka

在我工作的公司,我们Spring for Kafka在没有身份验证的情况下使用,最近我们做了一些实验来在 Kafka 中设置安全性,并且我们在短时间内启用了身份验证,这导致我们的微服务中的所有消费者/生产者都崩溃了!(微服务保持运行)

\n

例外情况:

\n
Authorization Exception and no authorizationExceptionRetryInterval set\n\norg.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: foo-group\n
Run Code Online (Sandbox Code Playgroud)\n

经过一些研究,我们发现这是 kafka 客户的预期行为,我们需要设置authorizationExceptionRetryInterval属性

\n

公共无效 setAuthorizationExceptionRetryInterval\xe2\x80\x8b(java.time.DurationauthorizationExceptionRetryInterval)

\n
\n

设置 KafkaConsumer 抛出 AuthorizationException 后\n重试之间的间隔。默认情况下,该字段为空,并且\n禁用重试。在这种情况下,容器将被停止。间隔\n必须小于 max.poll.interval.ms 消费者属性。

\n
\n

这是一些其他有用的链接

\n

为Spring Kafka设置authorizationExceptionRetryInterval

\n

为什么Spring KafkaConsumer在授权失败时会暂停n个主题的所有消费

\n

我想知道的是:

\n
    \n
  1. 身份验证失败是消费者/生产者出现故障时的唯一情况吗?
  2. \n
  3. 如果还有其他情况,如何确保我们的消费者/生产者在没有人为干预(重新启动微服务)的情况下恢复?换句话说,如何检查\n消费者/生产者是否已启动并重新启动它们?
  4. \n
\n

Gar*_*ell 6

仅在以下情况下停止容器:

  • AuthorizationException没有authorizationExceptionRetryInterval
  • NoOffsetForPartitionException- 当ConsumerConfig.AUTO_OFFSET_RESET_CONFIG不是earliestlatest且该消费者组的分区没有现有偏移量时抛出。
  • FencedInstanceIdException- 使用事务和静态组成员(意味着某些其他实例正在使用此实例 ID)。
  • StopAfterFenceException- 当stopContainerWhenFenced为真时(默认为假) - 仅适用于交易
  • 任何Error(例如 OOME)

  • 您可以添加“ApplicationEventListener” bean 或“@EventListener”方法并侦听“ConsumerStoppedEvent”或“ContainerStoppedEvent”,然后根据需要关闭整个服务。或者您可以添加逻辑来尝试定期重新启动容器。请参阅 https://docs.spring.io/spring-kafka/docs/current/reference/html/#events 和 https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafkalistener -生命周期 (2认同)