如何获取kafka偏移量数据,在时间戳上指定

iti*_*pa1 5 java apache-kafka kafka-consumer-api

当我尝试运行它时,我尝试根据时间戳获取 Kafka 主题的偏移量,它抛出空指针错误,

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
              for (TopicPartition partition : partitions) {
                timestampsToSearch.put(partition,  startTimestamp);
              }
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
              for (TopicPartition partition : partitions) {
                Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset);
Run Code Online (Sandbox Code Playgroud)

任何帮助将不胜感激。

Mic*_*son 6

要查找与时间戳对应的偏移量,您需要使用该offsetsForTimes()方法。

例如,这将打印mytopic1 秒前对应的分区 0 的偏移量:

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
    Map<TopicPartition, Long> timestamps = new HashMap<>();
    timestamps.put(new TopicPartition("mytopic", 0), System.currentTimeMillis()-1*1000);
    Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
    System.err.println(offsets);
}
Run Code Online (Sandbox Code Playgroud)

这将显示类似以下内容:

{offset-test-0=(timestamp=1561469319192, leaderEpoch=0, offset=100131)}
Run Code Online (Sandbox Code Playgroud)