如何使用 confluence-kafka-python 消费最近 N 天的消息?

wxh*_*wxh 4 python apache-kafka confluent-kafka-python

这个问题类似于Python KafkaConsumer start 消费来自时间戳的消息,除了我想知道如何在 Confluence 的官方 Python Kafka 客户端中执行此操作。

我研究了Consumer.offsets_for_times函数,但我对它接受TopicPartition.offset字段中的时间戳感到困惑。

a 如何offset等同于时间戳?

Jam*_*son 8

我最近做这个是为了工作。您需要获取 的结果offsets_for_times(),然后assign()将该列表发送给您的消费者,然后调用consume()。重要的是,不要 subscribe()进入主题。(请参阅 Eden Hill 在https://github.com/confluenceinc/confluence-kafka-python/issues/373上的评论)。

您是对的,在定义时间戳与偏移量时,该函数的文档有些令人困惑。

更新回答后续问题:

与如何使用 confluence-kafka-python 获取 Kafka 主题的最后一条消息的偏移量的区别?是不是

topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]
Run Code Online (Sandbox Code Playgroud)

你会做这样的事情:

topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]
Run Code Online (Sandbox Code Playgroud)