我正在尝试使用以下方法在kafka 0.8.2中创建一个主题:
AdminUtils.createTopic(zkClient, myTopic, 2, 1, properties);
Run Code Online (Sandbox Code Playgroud)
如果我多次在本地运行代码以进行测试,则失败,因为已经创建了该主题。创建主题之前,有没有办法检查主题是否存在?该TopicCommand
API似乎不为任何回报listTopics
或describeTopic
。
您可以使用 kakfa-client 版本 0.11.0.0 中的 AdminClient
示例代码:
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhist:9091");
AdminClient admin = AdminClient.create(config);
ListTopicsResult listTopics = admin.listTopics();
Set<String> names = listTopics.names().get();
boolean contains = names.contains("TEST_6");
if (!contains) {
List<NewTopic> topicList = new ArrayList<NewTopic>();
Map<String, String> configs = new HashMap<String, String>();
int partitions = 5;
Short replication = 1;
NewTopic newTopic = new NewTopic("TEST_6", partitions, replication).configs(configs);
topicList.add(newTopic);
admin.createTopics(topicList);
}
Run Code Online (Sandbox Code Playgroud)
为此,您可以使用该方法,如果主题已经存在,AdminUtils.topicExists(ZkUtils zkClient, String topic)
它将返回,否则返回。true
false
那么你的代码将是这样的:
if (!AdminUtils.topicExists(zkClient, myTopic)){
AdminUtils.createTopic(zkClient, myTopic, 2, 1, properties);
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
7338 次 |
最近记录: |