如果 auto.offset.reset=earliest 但主题没有消息,将设置什么消费者偏移量

Ans*_*ngh 5 apache-kafka kafka-consumer-api

我有 Kafka 服务器版本 2.4 并设置log.retention.hours=168(这样主题中的消息将在 7 天后被删除)和auto.offset.reset=earliest(这样如果消费者没有得到最后一次提交offset 那么它应该从头开始处理)。由于我使用的是 Kafka 2.4 版本,因此默认值offsets.retention.minutes=10080(因为我没有在我的应用程序中设置这个属性)。

我的话题数据是:1,2,3,4,5,6,7,8,9,10

关闭消费者之前的当前消费者偏移量:10

结束偏移:10

消费者最后提交的偏移量:10

因此,假设我的消费者在过去 7 天内没有运行,而我在第 8 天启动了消费者。所以我最后一次提交的消费者偏移将过期(由于offsets.retention.minutes=10080属性)并且主题消息也将被删除(由于log.retention.hours=168属性)。

所以想知道现在auto.offset.reset=earliest属性将设置什么消费者偏移量?

mik*_*ike 2

尽管 Kafka 主题中没有可用数据,但您的代理仍然知道该分区内的“下一个”偏移量。在您的情况下,该主题的第一个和最后一个偏移量是10,而它不包含任何数据。

因此,已经提交偏移量 10 的消费者在再次启动时将尝试读取 11,而与消费者配置无关auto.offset.reset

当您的主题有偏移量时,例如,直到 15,而消费者在提交偏移量 10 后关闭,您的示例会变得更加有趣。现在,想象一下由于保留策略,所有偏移量都从主题中删除。如果您随后仅启动消费者,那么消费者配置auto.offset.reset就会生效,如文档中所述:

“当 Kafka 中没有初始偏移量或者当前偏移量在服务器上不再存在时该怎么办(例如,因为该数据已被删除)

只要 Kafka 主题为空,就不会为消费者“设置”偏移量。消费者只是试图找到下一个可用的偏移量,或者基于

  • 最后提交的偏移量或者,
  • 如果最后提交的偏移量不再存在,则通过auto.offset.reset.

正如附加说明:即使消息似乎已被保留策略清理,您仍可能会在主题中看到一些数据,因为即使在保留时间/大小之后,数据仍保留在 Kafka 主题中