Zei*_*zar 1 python apache-kafka kafka-consumer-api kafka-python
我已经使用kafka-python
库编写了一个 python 脚本,它将消息写入和读取到kafka
. 我写消息没有任何问题;kafka
我可以使用控制台工具检索它们。但我无法使用我的 python 脚本读取它们。我的消费者有一个 for ,它冻结在迭代的第一行并且永远不会返回。这是我的代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)
for msg in consumer:
print(type(msg))
Run Code Online (Sandbox Code Playgroud)
消费者被创建并完全订阅;我可以看到它my-topic
列在其属性的主题列表中_client
。
任何想法?
默认情况下,kafka python 从最后一个偏移量开始,即只会读取新消息。一种方法是从头开始读取,或者另一种方法是使轮询主题处于无限循环中,如下代码所示:
while True:
try:
records = consumer.poll(60 * 1000) # timeout in millis , here set to 1 min
record_list = []
for tp, consumer_records in records.items():
for consumer_record in consumer_records:
record_list.append(consumer_record.value)
print(record_list) # record_list will be list of dictionaries
Run Code Online (Sandbox Code Playgroud)
要从头开始阅读,我们需要auto_offset_reset=earliest
在创建消费者对象时添加早期内容
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8"),
auto_offset_reset='earliest')
Run Code Online (Sandbox Code Playgroud)
让我知道这是否有帮助!