Sou*_*abh 35 offset apache-kafka apache-zookeeper zookeeper
我正在使用zookeeper从kafka获取数据.在这里,我总是从最后一个偏移点获取数据.有没有办法指定偏移时间来获取旧数据?
有一个选项autooffset.reset.它接受最小或最大.有人可以解释什么是最小和最大的.autooffset.reset可以帮助从旧的偏移点而不是最新的偏移点获取数据吗?
小智 23
消费者始终属于一个组,并且对于每个分区,Zookeeper会跟踪该分区中该使用者组的进度.
从一开始获取数据,可以删除与进程相关联的侯赛因所指向的所有数据
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
Run Code Online (Sandbox Code Playgroud)
您还可以指定所需分区的偏移量,如core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala中所指定.
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
Run Code Online (Sandbox Code Playgroud)
但是,偏移量不是时间索引,但您知道每个分区都是一个序列.
如果您的消息包含时间戳(并且要注意此时间戳与Kafka收到您的消息时无关),您可以尝试执行索引器,尝试通过将偏移量递增N来逐步检索一个条目,并存储元组(主题X,第2部分,偏移100,时间戳)某处.
如果要从指定的时刻检索条目,可以将二进制搜索应用于粗略索引,直到找到所需的条目并从中获取.
从Kafka 文档中他们说"kafka.api.OffsetRequest.EarliestTime()在日志中找到数据的开头并从那里开始流式传输,kafka.api.OffsetRequest.LatestTime()将仅传输新消息.不要假设偏移量0是起始偏移量,因为消息随着时间的推移而逐渐消失."
在这里使用SimpleConsumerExample:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
类似的问题:Kafka高级消费者使用Java API从主题获取所有消息(相当于 - 从 - 开始)
这可能有所帮助
| 归档时间: |
|
| 查看次数: |
42775 次 |
| 最近记录: |