来自kafka的元数据信息

use*_*608 5 apache-kafka kafka-producer-api confluent-schema-registry confluent-platform

我是 Confluence/Kafka 的新手,我想从 kafka 中查找元数据信息

我想知道

  1. 生产者名单
  2. 主题列表
  3. 主题的架构信息

Confluence版本是5.0

可以提供此信息的类(方法)是什么?
是否有任何 Rest API 用于相同的情况?
还需要连接 Zookeeper 才能获取此信息。

Gio*_*ous 4

1)我认为 Kafka 代理不知道在主题中生成消息的生产者,因此没有命令行工具来列出它们。然而,这个SO 问题的答案表明您可以通过查看 JMX 上的 MBean 来列出生产者。


2)为了列出您需要运行的主题:

kafka-topics --zookeeper localhost:2181 --list
Run Code Online (Sandbox Code Playgroud)

否则,如果您想使用 Java 客户端列出主题,您可以调用listTopics()的方法KafkaConsumer

您还可以通过 ZooKeeper 获取主题列表

ZkClient zkClient = new ZkClient("zkHost:zkPort");
List<String> topics = JavaConversions.asJavaList(ZkUtils.getAllTopics(zkClient));
Run Code Online (Sandbox Code Playgroud)


3) 要获取主题的架构信息,您可以使用架构注册表 API

特别是,您可以通过调用以下方式获取所有主题:

GET /subjects HTTP/1.1
Host: schemaregistry.example.com
Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json
Run Code Online (Sandbox Code Playgroud)

它应该给出类似于下面的响应:

HTTP/1.1 200 OK
Content-Type: application/vnd.schemaregistry.v1+json

["subject1", "subject2"]
Run Code Online (Sandbox Code Playgroud)

然后您可以获得特定主题的所有版本:

GET /subjects/subject-name/versions HTTP/1.1
Host: schemaregistry.example.com
Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json
Run Code Online (Sandbox Code Playgroud)

最后,您可以获得在该主题下注册的特定版本的架构

GET /subjects/subject_name/versions/1 HTTP/1.1
Host: schemaregistry.example.com
Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json
Run Code Online (Sandbox Code Playgroud)

或者只是最新注册的架构:

GET /subjects/subject-name/versions/latest HTTP/1.1
Host: schemaregistry.example.com
Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json
Run Code Online (Sandbox Code Playgroud)

为了在 Java 中执行此类操作,您可以准备自己的 GET 请求(请参阅此处的操作方法)或使用 Confluence 的架构注册表 Java 客户端。您可以在其Github 存储库中查看实现和可用方法。


关于您关于 Zookeeper 的问题,请注意ZK 是 Kafka 的要求

Kafka 使用 ZooKeeper,因此如果您还没有 ZooKeeper 服务器,则需要先启动它。您可以使用 kafka 打包的便捷脚本来获取快速且肮脏的单节点 ZooKeeper 实例。