kafka-python消费者没有收到消息

Kar*_*Raj 3 python apache-kafka kafka-consumer-api kafka-python

我无法KafaConsumer从头开始读取,或从任何其他显式偏移读取.

为同一主题的使用者运行命令行工具,我确实看到带有该--from-beginning选项的消息,否则它会挂起

$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning
Run Code Online (Sandbox Code Playgroud)

如果我通过python运行它,它会挂起,我怀疑是由不正确的消费者配置引起的

consumer = KafkaConsumer(topic_name,
                     bootstrap_servers=['localhost:9092'],
                     group_id=None,
                     auto_commit_enable=False,
                     auto_offset_reset='smallest')

print "Consuming messages from the given topic"
for message in consumer:
    print "Message", message
    if message is not None:
        print message.offset, message.value

print "Quit"
Run Code Online (Sandbox Code Playgroud)

输出:

使用来自给定主题的消息(之后挂起)

我使用kafka-python 0.9.5并且代理运行kafka 8.2.不确定究竟是什么问题.

按照dpkp的建议设置_group_id = None_以模拟控制台使用者的行为.

小智 8

auto_offset_reset='earliest' 为我解决了这个问题。


小智 8

我遇到了同样的问题:我可以在 kafka 控制台中接收,但无法使用 package 的 python 脚本获取消息kafka-python

最后我发现原因是我没有打电话,producer.flush()并且在producer.close()我的文档producer.py中没有提到。


tec*_*kuz 7

auto_offset_reset='earliest'group_id=None为我解决了它。


dpk*_*pkp 6

您发布的console-consumer和python使用者代码之间的区别在于python使用者使用使用者组来保存偏移量:group_id="test-consumer-group".如果您设置group_id = None,则应该看到与控制台使用者相同的行为.