如何使用Java从Kafka获取最近5天的消息

Sat*_*Sat 1 java apache-kafka kafka-consumer-api

我已经为Kafka中的主题设置了TTL为7天,我从Kafka数据库中获取数据并将其存储在数据库中,但是从过去5天开始我的数据库服务器已关闭,现在我必须从过去5天获取消息Kafka并将其存储在数据库中注意:从过去5天开始没有问题Kafka.

Han*_*sen 6

首先调用consumer.partitionsFor()方法来获取主题的分区

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor(java.lang.String)

然后调用consumer.offsetsForTimes()以获取成功处理最后一条消息的5天前时间戳的每个分区的偏移量.

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)

然后调用consumer.seek()来定位当前消费者偏移量并继续调用poll()并像平常一样处理消息.

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)