卡夫卡:客户已经用完了可用的经纪人

Bob*_*ith 5 go apache-kafka sarama

更新:原来我在 Docker 中的端口有问题。不知道为什么解决了这个现象。

我相信我遇到了一个奇怪的错误。我正在使用Sarama库并且能够成功创建消费者。

func main() {
 config = sarama.NewConfig()
 config.ClientID = "go-kafka-consumer"
 config.Consumer.Return.Errors = true
 // Create new consumer
 master, err := sarama.NewConsumer("localhost:9092", config)
 if err != nil {
    panic(err)
 }

 defer func() {
     if err := master.Close(); err != nil {
         panic(err)
     }
 }()

 partitionConsumer, err := master.ConsumePartition("myTopic",0, 
 sarama.OffsetOldest)
 if err != nil {
     panic(err)
 }
}
Run Code Online (Sandbox Code Playgroud)

一旦我打破这段代码并移到主程序之外,我就会遇到错误:

kafka:客户端已经没有可用的代理可以与之交谈(您的集群是否可访问?)

我已将我的代码拆分如下​​:我现在已将之前的 main() 方法转换为一个消费者包,其中包含一个名为 NewConsumer() 的方法,而我的新 main() 像这样调用 NewConsumer():

c := consumer.NewConsumer()
Run Code Online (Sandbox Code Playgroud)

恐慌声明在行中被触发sarama.NewConsumer并打印出来kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

为什么以这种方式分解我的代码会导致 Sarama 无法创建消费者?Sarama 是否需要直接从 main 运行?

ssc*_*ass 4

认为您可以通过这种方式创建 2 个或更多消费者,并将其分组为一个组(可能go-kafka-consumer)。您的代理有一个包含 1 个分区的主题,因此其中一个组被分配,另一个组会产生此错误消息。如果您将该主题的分区提高到 2,错误就会消失。\n但我认为您的问题是您以某种方式实例化了比以前更多的消费者。

\n

卡夫卡简而言之

\n
\n

消费者还可以针对给定主题组织成消费者组\xe2\x80\x94,组中的每个消费者从唯一的分区读取数据,并且该组作为一个整体消费来自整个主题的所有消息。如果您的消费者多于分区,那么一些消费者将处于空闲状态,因为他们没有可供读取的分区。如果您的分区多于消费者,那么消费者将从多个分区接收消息。如果消费者和分区数量相同,则每个消费者都会从一个分区中按顺序读取消息。

\n
\n

他们不会完全产生错误,所以这将是萨拉玛的一个问题。

\n

  • @Bob,因为仅端口转发不足以让客户端与容器中的 Kafka 进行通信 https://rmoff.net/2018/08/02/kafka-listeners-explained/ (2认同)