Python-读取所有消息后退出 Kafka 队列

Kev*_*ash 5 python apache-kafka kafka-consumer-api

我正在尝试使用 Python 读取 Kafka 队列的一些数据,如下面的代码所示:

from kafka import KafkaConsumer
import sys
import json 
import pandas as pd


bootstrap_servers = [localhost]
topicName = 'topic'
consumer = KafkaConsumer (topicName, group_id = 'topic',bootstrap_servers = bootstrap_servers, auto_offset_reset = 'earliest')

data_list = []
for message in consumer:
    print(message)
    data = json.loads(message.value)
    df = pd.json_normalize(data)
    data_list.append(df)
Run Code Online (Sandbox Code Playgroud)

这似乎永远在循环中运行,除非我终止连接。有没有办法在读取所有消息后或者队列中没有新消息后停止/退出此循环?

小智 2

poll方法应该是您正在寻找的

只需注意max_records参数,如果不更改,默认为max_poll_records500 条记录