使用 kafka-python 检索主题中的消息

Zei*_*zar 1 python apache-kafka kafka-consumer-api kafka-python

我已经使用kafka-python库编写了一个 python 脚本,它将消息写入和读取到kafka. 我写消息没有任何问题;kafka我可以使用控制台工具检索它们。但我无法使用我的 python 脚本读取它们。我的消费者有一个 for ,它冻结在迭代的第一行并且永远不会返回。这是我的代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)

for msg in consumer:
    print(type(msg))
Run Code Online (Sandbox Code Playgroud)

消费者被创建并完全订阅;我可以看到它my-topic列在其属性的主题列表中_client

任何想法?

Ana*_*Sai 6

默认情况下,kafka python 从最后一个偏移量开始,即只会读取新消息。一种方法是从头开始读取,或者另一种方法是使轮询主题处于无限循环中,如下代码所示:

while True:
    try:
        records = consumer.poll(60 * 1000) # timeout in millis , here set to 1 min

        record_list = []
        for tp, consumer_records in records.items():
            for consumer_record in consumer_records:
                record_list.append(consumer_record.value)
        print(record_list) # record_list will be list of dictionaries
Run Code Online (Sandbox Code Playgroud)

编辑

要从头开始阅读,我们需要auto_offset_reset=earliest在创建消费者对象时添加早期内容

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8"),
    auto_offset_reset='earliest')
Run Code Online (Sandbox Code Playgroud)

让我知道这是否有帮助!