检查系统是否连接到Kafka

use*_*630 6 java smoke-testing apache-kafka

我需要在Java中编写一个冒烟测试,验证系统是否连接到kafka,

有谁有想法吗?我发现这篇文章:

如何检查Kafka Server是否正在运行?

但是从Java代码中做起来太复杂了,我不认为这是我应该使用的方向.

提前致谢.

dbu*_*osp 8

我有同样的问题,我不想在没有任何答案的情况下留下这个问题.我读了很多关于如何检查连接的内容,我发现的大部分答案都是检查与Zk的连接,但我真的想直接检查与Kafka服务器的连接.

我所做的是创建一个简单的KafkaConsumer并列出listTopics()的所有主题.如果连接成功,那么您将得到一些回报.否则,你会得到一个TimeoutException.

  def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
    props.put("group.id", kafkaParams.get("group.id").get.toString)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val simpleConsumer = new KafkaConsumer[String, String](props)
    simpleConsumer.listTopics()
  }
Run Code Online (Sandbox Code Playgroud)

然后你可以将这个方法包装在一个try-catch句子中以捕获异常.


Nic*_*ico 3

编辑:这是非常古老的卡夫卡。2023 年不要使用这个:)


您可以使用以下命令检查服务器是否正在运行:

ZkClient zkClient = new ZkClient("your_zookeeper_server", 5000 /* ZOOKEEPER_SESSION_TIMEOUT */, 5000 /* ZOOKEEPER_CONNECTION_TIMEOUT */, ZKStringSerializer$.MODULE$);
List<Broker> brokers = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
if (brokers.isEmpty()) {
    // No brokers available
} else {
    // There are brokers available
}
Run Code Online (Sandbox Code Playgroud)

  • 这是一种检查 Zookeeper 连接的方法,而不是 Kafka 服务器。 (4认同)