当我注意到一些奇怪的行为时,我正在对一个老话题进行一些测试.阅读Kafka的日志我注意到这个"删除了8个过期的偏移"消息:
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)
Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log)
Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log)
Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log)
Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log)
Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log)
Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log)
Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
[GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator)
[Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
Run Code Online (Sandbox Code Playgroud)
事实上,我有两个问题:
1)此抵消到期如何适用于消费者群体?
2)这个过期的偏移量可以解释这种行为吗?我的消费者在有auto.offset.reset = latest时不会轮询任何东西,但是当它有auto.offset.reset = earliest时它从最后一个提交的偏移量中进行了轮询?
Mat*_*Sax 45
默认情况下,Kafka会在一段可配置的时间后删除已提交的偏移量.参见参数offsets.retention.minutes.即,如果消费者组在这段时间内处于非活动状态(即,不提交任何偏移),则偏移量将被删除.因此,即使消费者正在运行,如果它没有为某些分区提交偏移,那么这些偏移也会受到影响offset.retention.minutes.
如果您启动消费者,则会发生以下情况:
auto.offset.reset参数重置偏移量因此,如果您的偏移量被删除,并且auto.offset.reset = latest在将新数据添加到主题之前,您的消费者将不会轮询任何内容.如果auto.offset.reset = earliest它应该消耗整个主题.
有关此https://issues.apache.org/jira/browse/KAFKA-3806和https://issues.apache.org/jira/browse/KAFKA-4682的讨论,请参阅此JIRA
| 归档时间: |
|
| 查看次数: |
15407 次 |
| 最近记录: |