我们如何在给定的时间范围内阅读卡夫卡主题?

yuy*_*ang 8 apache-kafka

我需要从Kafka主题中读取给定时间范围内的消息.我能想到的解决方案是首先找出时间范围开始时的最大偏移量,然后继续使用消息,直到超出时间范围结束的所有分区上的偏移量.有没有更好的方法来解决这个问题?谢谢!

Gio*_*ait 7

好吧,您肯定必须首先搜索适合时间范围开头的第一个偏移量。

这可以使用KafkaConsumer#offsetsForTimes方法来完成。

该方法接受 的映射Map<TopicPartition, Long(timestamp)>,并返回 ,Map<TopicPartition, OffsetAndTimestamp>其中 的时间戳是时间戳等于或大于OffsetAndTimestamp指定时间戳的第一条消息。

从那里,您可以将消费者分配给返回的偏移量,并进行迭代,直到记录中的时间戳超过时间范围的末尾。

一些伪代码:

static void main(String[] args) {
    String topic = args[1];
    long timestampBeginning = Long.parseLong(args[2]);
    long timestampEnd = Long.parseLong(args[3]);
    TopicPartition partition = new TopicPartition(topic, 0);

    Consumer<Object, Object> consumer = createConsumer();

    long beginningOffset = consumer.offsetsForTimes(
            Collections.singletonMap(partition, timestampBeginning))
                    .get(partition).offset();

    consumer.assign(Collections.singleton(partition)); // must assign before seeking
    consumer.seek(partition, beginningOffset);

    for (ConsumerRecord<Object, Object> record : consumer.poll()) {
        if (record.timestamp() > timestampEnd) {
            break; // or whatever
        }

        // handle record
    }
}
Run Code Online (Sandbox Code Playgroud)


Sew*_*zki 0

你说的“时间范围”是什么意思?

队列中消息的时间范围或消息中的时间戳?:-)

我会考虑使用 Kafka Streams 和窗口流,或者从流中取出消息,并假设获取消息的当前时间戳是范围内的时间戳,然后考虑消息,否则就忽略它。

另一方面,如果您考虑消息中的时间戳,那么对流的小扩展(在 java DSL .filter() 方法中)将非常好地为您过滤消息。您只需要制定良好的谓词即可。

请参阅:Kafka Streams (Confluence)Kafka Streams (Apache)