Kafka的__consumer_offsets中的消息时间戳和commit_timestamp

KWe*_*Wer 5 apache-kafka kafka-consumer-api

我目前正在开发一个部分依赖于 Apache Kafka(版本 2.2.0)的应用程序。我必须做的一件事是跟踪其他消费者提交当前抵消的内容(更重要的是何时)。据我所知,仅使用 Java 客户端,无法获取已提交偏移量的关联时间戳,因为AdminClientlistConsumerGroupOffsets方法最终会生成一个OffsetAndMetadata不包含时间戳的对象。因此,我只是开始阅读该__consumer_offsets主题的消息。如果有更好的方法来做到这一点,请告诉我。

现在,如果直接读取消息__consumer_offsets,那么突然就有两个时间戳。一个是附加到实际提交消息的时间戳,另一个是commit_timestamp,它是消息内容的一部分。我的第一个想法是,其中一个可能是由代理设置的,另一个可能是由提交它的客户端设置的(另外,如果您/config/topics/__consumer_offsets在 ZooKeeper 中查看,它没有指定LogAppendTime消息时间戳,因此可以假设它只使用默认值)。唉,手动调整系统时间的快速实验表明,两者实际上都是由经纪人设置的。更重要的是,他们并不总是同意(消息的时间戳有时会稍微提前commit_timestamp)。我尝试深入研究 Kafka 代码以准确了解发生了什么,但它相当复杂,而且我对它不够熟悉,无法快速掌握。这是我的问题:

  1. 即使没有明确指定,为什么消息时间戳会__consumer_offsets自动输入?LogAppendTime难道只是用于发送提交消息的生产者将时间戳留空了?
  2. 为什么消息时间戳和commit_timestamp消息中包含的内容不一致?我似乎记得在某处读过,过去可以显式设置commit_timestamp并从而手动控制已提交偏移量的保留。
  3. 更重要的是:是否有理由使用其中一种而不是另一种?例如,如果仍然可以commit_timestamp手动设置,那么使用附加到消息的时间戳会更有意义。

我知道这是一个非常具体的问题,对大多数人来说可能并不重要。但直到现在,我总是能够通过使用 Google 并查看 Kafka 的源代码来了解后台发生的情况;然而,这个让我有点难住了。因此,任何见解都将受到高度赞赏。

小智 0

我认为后一个时间戳是过期时间。您可以尝试以下操作来确定一下吗?

"exclude.internal.topics=false"通过在 中设置参数来设置可访问的内部主题consumer.config

bin/kafka-console-consumer.sh  --consumer.config /tmp/consumer.config \
     --bootstrap-server localhost:9092 \
     --topic __consumer_offsets
Run Code Online (Sandbox Code Playgroud)

我可以看到结果如下:

[mygroup1,mytopic1,11]::[OffsetMetadata[55166421,NO_METADATA],CommitTime 1502060076305,ExpirationTime 1502146476305]
[mygroup1,mytopic1,13]::[OffsetMetadata[55037927,NO_METADATA],CommitTime 1502060076305,ExpirationTime 1502146476305]
[mygroup2,mytopic2,0]::[OffsetMetadata[126,NO_METADATA],CommitTime 1502060076343,ExpirationTime 1502146476343]
Run Code Online (Sandbox Code Playgroud)

我的机器上没有安装您在问题中提到的特定版本,因此请最终检查。