我正在尝试创建一个函数来检查我的 AWS MSK 中是否存在主题
func KafkaTopicExist(kafkabroker string, topic string) bool {
conn, err := kafka.Dial("tcp", kafkabroker)
if err != nil {
log.WithError(err).Warn("Kafka broker connection error")
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
log.WithError(err).Warn("Can not get all partitions to obtain ")
}
m := map[string]struct{}{}
for _, p := range partitions {
m[p.Topic] = struct{}{}
}
_, exists := m[topic]
return exists
Run Code Online (Sandbox Code Playgroud)
kafkabroker 是一个采用 MSK TLS 专用端点的字符串。我收到连接错误。而使用 PLAINTEXT 端点则有效。我是否缺少配置中的某些内容?