ffr*_*end 26 apache-kafka apache-zookeeper
出于特殊原因,我需要同时使用 - ConsumerGroup(也称为高级消费者)和SimpleConsumer(也称为低级消费者)来阅读Kafka.因为ConsumerGroup我使用基于ZooKeeper的配置并且对它完全满意,但SimpleConsumer需要实例化种子代理.
我不想保留两者的列表 - ZooKeeper和代理主机.因此,我正在寻找一种从ZooKeeper 自动发现特定主题的代理的方法.
由于一些间接信息,我相信这些数据存储在ZooKeeper中的以下路径之一:
/brokers/topics/<topic>/partitions/<partition-id>/state 但是,当我尝试从这些节点读取数据时,我收到序列化错误(我正在使用com.101tec.zkclient此代码):
org.I0Itec.zkclient.exception.ZkMarshallingError:java.io.StreamCorruptedException:无效的流标题:位于org.I0Itec.zkclient.ZkClient.derializable的org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)中的7B226A6D (zkClient.java:740)org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)atg.I0Itec.zkclient.ZkClient. readData(ZkClient.java:750)at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)... 64 elided引起:java.io.StreamCorruptedException:无效的流标题:java.io.ObjectInputStream中的7B226A6D位于org.I0Itec.zkclient.serialize的org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java :)的java.io.ObjectInputStream.(ObjectInputStream.java:299)中的.readStreamHeader(ObjectInputStream.java:804). SerializableSerializer.deserialize(SerializableSerializer.java:31)... 69更多
我可以毫无问题地编写和读取自定义Java对象(例如字符串),所以我认为这不是客户端的问题,而是相当棘手的编码.因此,我想知道:
Hee*_*jin 30
这就是我的一位同事为了获得卡夫卡经纪人名单而采取的方式.我想当你想要动态获取代理列表时,这是一种正确的方法.
这是一个示例代码,显示如何获取列表.
public class KafkaBrokerInfoFetcher {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
}
}
}
Run Code Online (Sandbox Code Playgroud)
将代码运行到由三个代理组成的集群上会导致
1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}
Run Code Online (Sandbox Code Playgroud)
ffr*_*end 14
事实证明,Kafka用于ZKStringSerializer读取和写入数据到znodes.因此,要修复错误,我只需将其添加为ZkClient构造函数中的最后一个参数:
val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)
Run Code Online (Sandbox Code Playgroud)
使用它,我写了几个有用的函数来发现经纪人id,他们的地址和其他东西:
import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException
def listBrokers(): List[Int] = {
zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
}
def listTopics(): List[String] = {
zkClient.getChildren("/brokers/topics").toList
}
def listPartitions(topic: String): List[Int] = {
val path = "/brokers/topics/" + topic + "/partitions"
if (zkClient.exists(path)) {
zkClient.getChildren(path).toList.map(_.toInt)
} else {
throw new KafkaException(s"Topic ${topic} doesn't exist")
}
}
def getBrokerAddress(brokerId: Int): (String, Int) = {
val path = s"/brokers/ids/${brokerId}"
if (zkClient.exists(path)) {
val brokerInfo = readZkData(path)
(brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
} else {
throw new KafkaException("Broker with ID ${brokerId} doesn't exist")
}
}
def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = {
val path = s"/brokers/topics/${topic}/partitions/${partitionId}/state"
if (zkClient.exists(path)) {
val leaderStr = zkClient.readData[String](path)
val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
getBrokerAddress(leaderId)
} else {
throw new KafkaException(s"Topic (${topic}) or partition (${partitionId}) doesn't exist")
}
}
Run Code Online (Sandbox Code Playgroud)
要使用shell执行此操作:
zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
=> [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
28093 次 |
| 最近记录: |