如何检查Kafka Server是否正在运行?

Kha*_*han 39 java apache-kafka kafka-producer-api

我想在开始生产和消费工作之前确保kafka服务器是否正在运行.它是在windows环境中,这是我的kafka服务器在eclipse中的代码...

Properties kafka = new Properties();
kafka.setProperty("broker.id", "1");
kafka.setProperty("port", "9092");
kafka.setProperty("log.dirs", "D://workspace//");
kafka.setProperty("zookeeper.connect", "localhost:2181");    
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(kafka);        
KafkaServer server = new KafkaServer(config, new CurrentTime(), option);
server.startup();
Run Code Online (Sandbox Code Playgroud)

在这种情况下if (server != null)是不够的,因为它总是如此.那么有没有办法知道我的kafka服务器正在运行并为生产者做好准备.我有必要检查一下,因为它会导致丢失一些起始数据包.

谢谢.

Pau*_*rey 38

必须为所有Kafka经纪人分配一个broker.id.在启动时,代理将在Zookeeper中创建一个短暂的节点,其路径为/broker/ids/$id.由于节点是短暂的,所以一旦代理断开连接就会被移除,例如通过关闭.

您可以查看临时代理节点的列表,如下所示:

echo dump | nc localhost 2181 | grep brokers

ZooKeeper客户端接口公开了许多命令; dump列出群集的所有会话和临时节点.

注意,以上假设:

  • 您正在默认端口(2181)上运行ZooKeeper localhost,这localhost是集群的领导者
  • 您的zookeeper.connectKafka配置没有为您的Kafka群集指定chroot env,即它只是host:port而不是host:port/path

  • 因此,这实际上检查“zookeeper”是否至少连接了一个“kafka”。它不会测试 _your_ `kafka` 是否正在运行。在OP的情况下这是正确的,但这是一个间接测试。可能需要研究可以在端口 9092 上做什么来进行直接测试。 (2认同)

sel*_*t91 9

您可以在您的机器上安装Kafkacat工具

例如在 Ubuntu 你可以使用安装它

apt-get install kafkacat
Run Code Online (Sandbox Code Playgroud)

安装kafkacat后,您可以使用以下命令连接它

kafkacat -b <your-ip-address>:<kafka-port> -t test-topic
Run Code Online (Sandbox Code Playgroud)
  • <your-ip-address>替换为您的机器 ip
  • <kafka-port>可以替换为运行 kafka 的端口。通常是9092

运行上述命令后,如果 kafkacat 能够建立连接,则表示 kafka 已启动并正在运行

  • kafkacat -b localhost:9092 -L // 根据 https://docs.confluence.io/platform/current/app-development/kafkacat-usage.html,-L 显示 Kafka 集群及其主题的当前状态,分区、副本... (4认同)

Moh*_*sal 6

我使用了AdminClient API。

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = KafkaAdminClient.create(properties))
{
    ListTopicsResult topics = client.listTopics();
    Set<String> names = topics.names().get();
    if (names.isEmpty())
    {
        // case: if no topic found.
    }
    return true;
}
catch (InterruptedException | ExecutionException e)
{
    // Kafka is not available
}
Run Code Online (Sandbox Code Playgroud)

  • @Leon 我发现这个答案很有价值。当然,您需要了解您实际上“监视”的是什么。探测 Zookeeper 和探测引导服务器将回答两组不同的问题。我认为从客户端的角度来看,连接到引导服务器是正确的做法。即使您应该对 Kafka 集群(ZK 和节点)进行专门的监控,验证特定客户端是否确实可以连接到集群也是有意义的。 (2认同)
  • 这应该被标记为已接受的答案! (2认同)

小智 6

对于 Linux,“ps aux | grep kafka”查看结果中是否显示了 kafka 属性。例如 /path/to/kafka/server.properties


dbu*_*osp 5

保罗的回答非常好,从经纪人的角度来看,这实际上是 Kafka 和 Zk 如何协同工作。

我想说另一个检查 Kafka 服务器是否正在运行的简单选项是创建一个简单的 KafkaConsumer 指向集群并尝试一些操作,例如listTopics()。如果 kafka 服务器没有运行,你会得到一个TimeoutException然后你可以使用一个try-catch句子。

  def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
    props.put("group.id", kafkaParams.get("group.id").get.toString)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val simpleConsumer = new KafkaConsumer[String, String](props)
    simpleConsumer.listTopics()
  }
Run Code Online (Sandbox Code Playgroud)