Trầ*_* Dự 1 go apache-kafka confluent-kafka
我正在为我的Kafka客户端使用融合golang.我用来AdminClient在kafka集群中创建/删除/获取主题.这是我的初始化代码AdminClient
adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": 127.0.0.1:9092,
})
Run Code Online (Sandbox Code Playgroud)
之后,我使用此类创建并获取kafka集群中的所有主题.以下是创建主题的代码:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
results, err := adminClient.CreateTopics(
ctx,
[]kafka.TopicSpecification{{
Topic: topic,
NumPartitions: numPartitions,
ReplicationFactor: replicationFactor}},
kafka.SetAdminOperationTimeout(TimeOut),
)
Run Code Online (Sandbox Code Playgroud)
之后,我再次获得主题信息:
result, err := adminClient.GetMetadata(&topic, false, 1000)
Run Code Online (Sandbox Code Playgroud)
问题是:如果我得到之前不存在的主题,kafka将自动创建该主题.这是我不想要的行为.请告诉我如何解决这个问题.