我正在使用 kafka-net 客户端向 kafka 发送消息。我只是想知道是否有任何方法可以检查 kafka 服务器是否已启动并可以接收消息。我关闭了 kafka,但是已经成功创建了生产者并且 SendMessageAsync 只是冻结了很长时间。我试图通过超时,但它没有改变任何东西。我使用 kafka-net 0.9 当 kafka 服务器启动并运行时它工作得很好
Broker的id在zookeeper()中注册/brokers/ids/[brokerId]
为临时节点,这允许其他broker和消费者检测故障。(目前健康的定义相当幼稚。如果在zk中注册,则/brokers/ids/[brokerId]
broker是健康的,否则它是死的)。
只要代理的会话处于活动状态,zookeeper 临时节点就存在。
您可以通过 ZkUtils.getSortedBrokerList(zkClient) 检查代理是否已启动,它返回以下所有活动代理 ID/brokers/ids
import org.I0Itec.zkclient.ZkClient;
ZkClient zkClient = new ZkClient(properties.getProperty("zkQuorum"), zkSessionTimeout, zkConnectionTimeout,ZKStringSerializer$.MODULE$);
ZkUtils.getSortedBrokerList(zkClient);
Run Code Online (Sandbox Code Playgroud)
Zookeeper中参考
Kafka数据结构