gho*_*ost 5 python apache-kafka kafka-consumer-api kafka-python
我正在尝试使用来自 Kafka 主题的消息。我在confluent_kafka消费者周围使用包装器。在开始使用消息之前,我需要检查是否建立了连接。
我读到消费者很懒惰,所以我需要执行一些操作才能建立连接。但是我想在不执行consumeorpoll操作的情况下检查连接建立。
此外,我尝试给出一些错误的配置,以查看民意调查的反应是什么。我得到的回应是:
b'Broker: No more messages'
Run Code Online (Sandbox Code Playgroud)
那么,如何判断是连接参数错误、连接中断还是主题中实际上没有消息?
恐怕没有直接的方法来测试 Kafka Brokers 是否已启动并运行。另请注意,如果您的消费者已经消费了这些消息,这并不意味着这是一种不良行为,而且显然这并不表示 Kafka 代理已关闭。
一种可能的解决方法是执行某种快速操作并查看代理是否响应。一个例子是列出主题:
使用confluent-kafka-python和AdminClient
# Example using confuent_kafka
from confluent_kafka.admin import AdminClient
kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics
if not topics:
raise RuntimeError()
Run Code Online (Sandbox Code Playgroud)
# example using kafka-python
import kafka
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()
if not topics:
raise RuntimeError()
Run Code Online (Sandbox Code Playgroud)