如何在指定的一天从Kafka群集中获取消息或数据.例如9月13日,任何人都可以为此提供代码.我用谷歌搜索它,发现只有理论,但我想要代码
Mat*_*Sax 34
没有这种访问方法.此外,在Kafka v0.10消息不包含任何时间戳信息之前,因此无法知道消息何时被写入主题.
从Kafka开始,v0.10每条消息都包含一个元数据时间戳属性,该属性由生产者在消息创建时设置,或由代理在消息插入时设置.计划基于时间的索引,但尚未提供.因此,您需要使用整个主题并检查时间戳字段(并忽略您不感兴趣的所有消息).要找到开头,您还可以对偏移和时间戳进行二进制搜索,以便更快地找到第一条消息.
更新:
Kakfa 0.10.1添加了一个基于时间的索引.它允许seek使用时间戳等于或大于给定时间戳的第一条记录.你可以通过它来使用它KafkaConsumer#offsetsForTime().这将返回相应的偏移量,您可以将它们输入KafkaConsumer#seek().您可以只使用数据并检查记录时间戳字段,ConsumerRecord#timestamp()以查看何时可以停止处理.
请注意,数据严格按偏移排序,但不按时间戳排序.因此,在加工过程中,你可能会得到与"迟到"的记载更小的时间戳比你开始时间戳(你可以简单的跳过了这些记录虽然).
更困难的问题是在搜索间隔结束时迟到的记录.在获得时间戳大于搜索间隔的第一个时间戳之后,可能仍会有时间戳记的记录,这些记录是稍后搜索间隔的一部分(如果这些记录确实附加到主题"迟到").但是,没有办法知道这一点.因此,您可能希望继续阅读"更多"记录并检查是否存在"迟到"记录."一些记录"的含义是您自己需要做出的设计决定.
但是没有一般的指导原则 - 如果你对你的"写模式"有了更多的了解,它可以帮助你在搜索间隔"结束"之后为你想要消费的记录定义一个好的策略.当然有两种默认策略:(1)停止在第一条记录中,时间戳大于搜索间隔(并有效地忽略任何迟到的记录 - 如果使用"日志附加时间"配置,这当然是一种安全策略); (2)你读到日志的末尾 - 这是关于完整性的最安全的策略,但可能导致过高的开销(同时注意,因为记录可以随时附加,如果记录"延迟"可能是任意大的,在您到达日志结束后,甚至可能会追加延迟记录.
在实践中,考虑"最大预期延迟"并读取直到获得具有比此上限延迟更大的时间戳的记录可能是个好主意.
将其添加到当前命令 --property print.timestamp=true 这将打印时间戳 CreateTime:1609917197764。
示例: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --property print.timestamp=true --from-beginning
| 归档时间: |
|
| 查看次数: |
21083 次 |
| 最近记录: |