Kafka消费者获取主题的元数据失败

jwi*_*ner 37 apache-kafka kafka-consumer-api

我正在尝试为第三方的Kafka和ZooKeeper服务器编写Java客户端.我能够列出和描述主题,但是当我尝试阅读任何主题时,ClosedChannelException会引发a.我使用命令行客户端在这里重现它们.

$ bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic
[2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         
[2015-06-02 16:23:04,515] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:0,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         
Run Code Online (Sandbox Code Playgroud)

备用命令成功:

$ bin/kafka-topics.sh --describe --zookeeper 255.255.255.255:2181 --topic eventbustopic
Topic:eventbustopic   PartitionCount:2        ReplicationFactor:1     Configs:
    Topic: eventbustopic  Partition: 0    Leader: 1       Replicas: 1     Isr: 1
    Topic: eventbustopic  Partition: 1    Leader: 0       Replicas: 0     Isr: 0

$ bin/kafka-topics.sh --list --zookeeper 255.255.255.255:2181 --topic eventbustopic
eventbustopic
Run Code Online (Sandbox Code Playgroud)

(ips被编辑并替换为255.255.255.255)

当我谷歌这个例外时,我看到生产者方面的问题 - 事实上,来源ClientUtils.fetchTopicMetadata表明这主要是由生产者使用.

我担心的一个问题是,这可能是网络布局的一个产物:数据包被Haproxy破坏并通过VPN发送.

究竟什么在这里工作?

Joh*_*rop 46

代理告诉客户端应该使用哪个主机名来生成/使用消息.默认情况下,Kafka使用其运行的系统的主机名.如果客户端无法解析此主机名,则会出现此异常.

您可以尝试将advertised.host.nameKafka配置设置为客户端应使用的主机名/地址.

  • 对于其他遇到此问题的人来说,也可能是JVM有-Xmx或资源限制(Docker)太低.你得到完全相同的错误. (3认同)
  • @DonovanMuller 您的评论可能应该作为对这个问题的完整答案发布。这是解决我的问题的一个答案,我怀疑它可能会以同样的方式帮助其他人,但作为评论很容易错过 (2认同)

djz*_*zhu 11

这是我解决这个问题的方法:

  1. 运行bin/kafka-server-stop.sh以停止运行kafka服务器.
  2. config/server.properties通过添加一行来修改属性文件: listeners=PLAINTEXT://{ip.of.your.kafka.server}:9092
  3. 重启kafka服务器.

由于没有lisener设置,kafka将用于java.net.InetAddress.getCanonicalHostName()获取套接字服务器侦听的地址.