Apache Kafka消费者群体的抵消如何到期?

Enz*_*nzo 27 apache-kafka

当我注意到一些奇怪的行为时,我正在对一个老话题进行一些测试.阅读Kafka的日志我注意到这个"删除了8个过期的偏移"消息:

[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)
Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log)
Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log)
Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log)
Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log)
Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log)
Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log)
Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
[GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator)
[Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
Run Code Online (Sandbox Code Playgroud)

事实上,我有两个问题:

1)此抵消到期如何适用于消费者群体?

2)这个过期的偏移量可以解释这种行为吗?我的消费者在有auto.offset.reset = latest时不会轮询任何东西,但是当它有auto.offset.reset = earliest时它从最后一个提交的偏移量中进行了轮询?

Mat*_*Sax 45

默认情况下,Kafka会在一段可配置的时间后删除已提交的偏移量.参见参数offsets.retention.minutes.即,如果消费者组在这段时间内处于非活动状态(即,不提交任何偏移),则偏移量将被删除.因此,即使消费者正在运行,如果它没有为某些分区提交偏移,那么这些偏移也会受到影响offset.retention.minutes.

如果您启动消费者,则会发生以下情况:

  1. 寻找(有效的)承诺偏移量(针对消费者群体)
    1. 如果找到有效的偏移量,则从那里继续
    2. 如果未找到有效偏移量,则根据auto.offset.reset参数重置偏移量

因此,如果您的偏移量被删除,并且auto.offset.reset = latest在将新数据添加到主题之前,您的消费者将不会轮询任何内容.如果auto.offset.reset = earliest它应该消耗整个主题.

有关此https://issues.apache.org/jira/browse/KAFKA-3806https://issues.apache.org/jira/browse/KAFKA-4682的讨论,请参阅此JIRA

  • 这不一定是真的.如果设置enable.auto.commit = false并且没有新数据(没有提交) - 提交将过期. (3认同)
  • @DmitryMinkovsky是的.那是正确的.有一个相应的JIRA:https://issues.apache.org/jira/browse/KAFKA-5510 (3认同)
  • 目前这会影响Kafka Streams,它设置`enable.auto.commit = false`并且有'auto.offset.reset = earliest`.默认情况下,如果Kafka Streams应用程序未处理数据24小时,然后重新启动,则会删除其偏移量并从头开始重新处理数据. (2认同)