Bob*_*ith 5 go apache-kafka sarama
更新:原来我在 Docker 中的端口有问题。不知道为什么解决了这个现象。
我相信我遇到了一个奇怪的错误。我正在使用Sarama库并且能够成功创建消费者。
func main() {
config = sarama.NewConfig()
config.ClientID = "go-kafka-consumer"
config.Consumer.Return.Errors = true
// Create new consumer
master, err := sarama.NewConsumer("localhost:9092", config)
if err != nil {
panic(err)
}
defer func() {
if err := master.Close(); err != nil {
panic(err)
}
}()
partitionConsumer, err := master.ConsumePartition("myTopic",0,
sarama.OffsetOldest)
if err != nil {
panic(err)
}
}
Run Code Online (Sandbox Code Playgroud)
一旦我打破这段代码并移到主程序之外,我就会遇到错误:
kafka:客户端已经没有可用的代理可以与之交谈(您的集群是否可访问?)
我已将我的代码拆分如下:我现在已将之前的 main() 方法转换为一个消费者包,其中包含一个名为 NewConsumer() 的方法,而我的新 main() 像这样调用 NewConsumer():
c := consumer.NewConsumer()
Run Code Online (Sandbox Code Playgroud)
恐慌声明在行中被触发sarama.NewConsumer并打印出来kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
为什么以这种方式分解我的代码会导致 Sarama 无法创建消费者?Sarama 是否需要直接从 main 运行?
我认为您可以通过这种方式创建 2 个或更多消费者,并将其分组为一个组(可能go-kafka-consumer)。您的代理有一个包含 1 个分区的主题,因此其中一个组被分配,另一个组会产生此错误消息。如果您将该主题的分区提高到 2,错误就会消失。\n但我认为您的问题是您以某种方式实例化了比以前更多的消费者。
卡夫卡简而言之:
\n\n\n消费者还可以针对给定主题组织成消费者组\xe2\x80\x94,组中的每个消费者从唯一的分区读取数据,并且该组作为一个整体消费来自整个主题的所有消息。如果您的消费者多于分区,那么一些消费者将处于空闲状态,因为他们没有可供读取的分区。如果您的分区多于消费者,那么消费者将从多个分区接收消息。如果消费者和分区数量相同,则每个消费者都会从一个分区中按顺序读取消息。
\n
他们不会完全产生错误,所以这将是萨拉玛的一个问题。
\n| 归档时间: |
|
| 查看次数: |
14835 次 |
| 最近记录: |