如何以编程方式检查 Kafka Broker 是否已启动并在 Python 中运行

gho*_*ost 5 python apache-kafka kafka-consumer-api kafka-python

我正在尝试使用来自 Kafka 主题的消息。我在confluent_kafka消费者周围使用包装器。在开始使用消息之前,我需要检查是否建立了连接。

我读到消费者很懒惰,所以我需要执行一些操作才能建立连接。但是我想在不执行consumeorpoll操作的情况下检查连接建立。

此外,我尝试给出一些错误的配置,以查看民意调查的反应是什么。我得到的回应是:

b'Broker: No more messages'
Run Code Online (Sandbox Code Playgroud)

那么,如何判断是连接参数错误、连接中断还是主题中实际上没有消息?

Gio*_*ous 8

恐怕没有直接的方法来测试 Kafka Brokers 是否已启动并运行。另请注意,如果您的消费者已经消费了这些消息,这并不意味着这是一种不良行为,而且显然这并不表示 Kafka 代理已关闭。


一种可能的解决方法是执行某种快速操作并查看代理是否响应。一个例子是列出主题:

使用confluent-kafka-pythonAdminClient

# Example using confuent_kafka
from confluent_kafka.admin import AdminClient

kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics

if not topics: 
    raise RuntimeError()
Run Code Online (Sandbox Code Playgroud)

使用kafka-pythonKafkaConsumer

# example using kafka-python
import kafka


consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()

if not topics: 
    raise RuntimeError()
Run Code Online (Sandbox Code Playgroud)