Kha*_*han 39 java apache-kafka kafka-producer-api
我想在开始生产和消费工作之前确保kafka服务器是否正在运行.它是在windows环境中,这是我的kafka服务器在eclipse中的代码...
Properties kafka = new Properties();
kafka.setProperty("broker.id", "1");
kafka.setProperty("port", "9092");
kafka.setProperty("log.dirs", "D://workspace//");
kafka.setProperty("zookeeper.connect", "localhost:2181");
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(kafka);
KafkaServer server = new KafkaServer(config, new CurrentTime(), option);
server.startup();
Run Code Online (Sandbox Code Playgroud)
在这种情况下if (server != null)是不够的,因为它总是如此.那么有没有办法知道我的kafka服务器正在运行并为生产者做好准备.我有必要检查一下,因为它会导致丢失一些起始数据包.
谢谢.
Pau*_*rey 38
必须为所有Kafka经纪人分配一个broker.id.在启动时,代理将在Zookeeper中创建一个短暂的节点,其路径为/broker/ids/$id.由于节点是短暂的,所以一旦代理断开连接就会被移除,例如通过关闭.
您可以查看临时代理节点的列表,如下所示:
echo dump | nc localhost 2181 | grep brokers
ZooKeeper客户端接口公开了许多命令; dump列出群集的所有会话和临时节点.
注意,以上假设:
2181)上运行ZooKeeper localhost,这localhost是集群的领导者zookeeper.connectKafka配置没有为您的Kafka群集指定chroot env,即它只是host:port而不是host:port/path 您可以在您的机器上安装Kafkacat工具
例如在 Ubuntu 你可以使用安装它
apt-get install kafkacat
Run Code Online (Sandbox Code Playgroud)
安装kafkacat后,您可以使用以下命令连接它
kafkacat -b <your-ip-address>:<kafka-port> -t test-topic
Run Code Online (Sandbox Code Playgroud)
运行上述命令后,如果 kafkacat 能够建立连接,则表示 kafka 已启动并正在运行
我使用了AdminClient API。
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = KafkaAdminClient.create(properties))
{
ListTopicsResult topics = client.listTopics();
Set<String> names = topics.names().get();
if (names.isEmpty())
{
// case: if no topic found.
}
return true;
}
catch (InterruptedException | ExecutionException e)
{
// Kafka is not available
}
Run Code Online (Sandbox Code Playgroud)
保罗的回答非常好,从经纪人的角度来看,这实际上是 Kafka 和 Zk 如何协同工作。
我想说另一个检查 Kafka 服务器是否正在运行的简单选项是创建一个简单的 KafkaConsumer 指向集群并尝试一些操作,例如listTopics()。如果 kafka 服务器没有运行,你会得到一个TimeoutException然后你可以使用一个try-catch句子。
def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
val props = new Properties()
props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
props.put("group.id", kafkaParams.get("group.id").get.toString)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val simpleConsumer = new KafkaConsumer[String, String](props)
simpleConsumer.listTopics()
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
65931 次 |
| 最近记录: |