我需要从Kafka主题中读取给定时间范围内的消息.我能想到的解决方案是首先找出时间范围开始时的最大偏移量,然后继续使用消息,直到超出时间范围结束的所有分区上的偏移量.有没有更好的方法来解决这个问题?谢谢!
好吧,您肯定必须首先搜索适合时间范围开头的第一个偏移量。
这可以使用KafkaConsumer#offsetsForTimes方法来完成。
该方法接受 的映射Map<TopicPartition, Long(timestamp)>
,并返回 ,Map<TopicPartition, OffsetAndTimestamp>
其中 的时间戳是时间戳等于或大于OffsetAndTimestamp
指定时间戳的第一条消息。
从那里,您可以将消费者分配给返回的偏移量,并进行迭代,直到记录中的时间戳超过时间范围的末尾。
一些伪代码:
static void main(String[] args) {
String topic = args[1];
long timestampBeginning = Long.parseLong(args[2]);
long timestampEnd = Long.parseLong(args[3]);
TopicPartition partition = new TopicPartition(topic, 0);
Consumer<Object, Object> consumer = createConsumer();
long beginningOffset = consumer.offsetsForTimes(
Collections.singletonMap(partition, timestampBeginning))
.get(partition).offset();
consumer.assign(Collections.singleton(partition)); // must assign before seeking
consumer.seek(partition, beginningOffset);
for (ConsumerRecord<Object, Object> record : consumer.poll()) {
if (record.timestamp() > timestampEnd) {
break; // or whatever
}
// handle record
}
}
Run Code Online (Sandbox Code Playgroud)
你说的“时间范围”是什么意思?
队列中消息的时间范围或消息中的时间戳?:-)
我会考虑使用 Kafka Streams 和窗口流,或者从流中取出消息,并假设获取消息的当前时间戳是范围内的时间戳,然后考虑消息,否则就忽略它。
另一方面,如果您考虑消息中的时间戳,那么对流的小扩展(在 java DSL .filter() 方法中)将非常好地为您过滤消息。您只需要制定良好的谓词即可。
请参阅:Kafka Streams (Confluence)和Kafka Streams (Apache)
归档时间: |
|
查看次数: |
1253 次 |
最近记录: |