消费者重新启动后,kafka-python 从最后生成的消息中读取

ugo*_*ria 2 python apache-kafka kafka-python

我正在使用kafka-python来使用来自 kafka 队列(kafka 版本 0.10.2.0)的消息。特别是我使用的是KafkaConsumer类型。如果消费者停止并在一段时间后重新启动,我想从最新生成的消息重新启动,即删除消费者关闭期间生成的所有消息。我怎样才能做到这一点?

谢谢

ugo*_*ria 6

谢谢,

有用!

这是我的代码的简化版本:

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
#dummy poll
consumer.poll()
#go to end of the stream
consumer.seek_to_end()
#start iterate
for message in consumer:
    print(message)

consumer.close()
Run Code Online (Sandbox Code Playgroud)

文档指出 poll() 方法与迭代器接口不兼容,我猜这是我在脚本末尾的循环中使用的那个。然而,从最初的测试来看,这段代码看起来可以正常工作。

使用它安全吗?还是我误解了文档?

谢谢


Mat*_*Sax 5

您不会到达seekToEnd()日志的末尾。

请记住,您首先需要订阅某个主题,然后才能进行搜索。另外,订阅是懒惰的。因此,您还需要先添加“虚拟民意调查”,然后才能进行搜索。

consumer.subscribe(...)
consumer.poll() // dummy poll
consumer.seekToEnd()

// now enter your regular poll-loop
Run Code Online (Sandbox Code Playgroud)