Kafka:如何根据时间戳消费数据

Vol*_*il3 3 python apache-kafka

我想知道除了偏移量之外是否还有其他方法来获取时间间隔方面的数据?比如说,我想消耗昨天的所有日期,我该怎么做?

Jo *_* Ja 11

使用offsetsForTimes获取与所需时间戳相关的正确偏移量。在 Python 中,它将如下所示:

from datetime import datetime
from kafka import KafkaConsumer, TopicPartition

topic  = "www.kilskil.com" 
broker = "localhost:9092"

# lets check messages of the first day in New Year
date_in  = datetime(2019,1,1)
date_out = datetime(2019,1,2)

consumer = KafkaConsumer(topic, bootstrap_servers=broker, enable_auto_commit=True)
consumer.poll()  # we need to read message or call dumb poll before seeking the right position

tp      = TopicPartition(topic, 0) # partition n. 0
# in simple case without any special kafka configuration there is only one partition for each topic channel
# and it's number is 0

# in fact you asked about how to use 2 methods: offsets_for_times() and seek()
rec_in  = consumer.offsets_for_times({tp:date_in.timestamp() * 1000})
rec_out = consumer.offsets_for_times({tp:date_out.timestamp() * 1000})

consumer.seek(tp, rec_in[tp].offset) # lets go to the first message in New Year!

c = 0
for msg in consumer:
  if msg.offset >= rec_out[tp].offset:
    break

  c += 1
  # message also has .timestamp field

print("{c} messages between {_in} and {_out}".format(c=c, _in=str(date_in), _out=str(date_out)))
Run Code Online (Sandbox Code Playgroud)

不要忘记 Kafka 以毫秒为单位测量时间戳,并且它具有long类型。Python lib datetime 以秒为单位返回时间戳,因此我们需要将其乘以 1000。方法offsets_for_times返回一个带有TopicPartition键和OffsetAndTimestamp值的字典。


Kat*_*ova 3

您可以找到指定时间间隔开始的最早偏移量并倒回至该偏移量。然而,很难理解间隔的终点在哪里,因为具有最早时间戳的记录可能会晚到达。因此,您可以从间隔开始时使用记录,直到找到时间戳晚于 endTime 的记录以及更多记录来捕获迟到的消息。

倒带到 startTime 的代码是:

public void rewind(DateTime time) {
    Set<TopicPartition> assignments = consumer.assignment();
    Map<TopicPartition, Long> query = new HashMap<>();
    for (TopicPartition topicPartition : assignments) {
        query.put(topicPartition, time.getMillis());
    }
    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);

    result.entrySet().stream().forEach(entry -> consumer.seek(entry.getKey(),
            Optional.ofNullable(entry.getValue()).map(OffsetAndTimestamp::offset).orElse(new Long(0))));
}
Run Code Online (Sandbox Code Playgroud)