在Java中创建之前检查kafka中主题的存在

nis*_*ntv 6 java apache-kafka

我正在尝试使用以下方法在kafka 0.8.2中创建一个主题:

AdminUtils.createTopic(zkClient, myTopic, 2, 1, properties);
Run Code Online (Sandbox Code Playgroud)

如果我多次在本地运行代码以进行测试,则失败,因为已经创建了该主题。创建主题之前,有没有办法检查主题是否存在?该TopicCommandAPI似乎不为任何回报listTopicsdescribeTopic

Rag*_*ddy 6

您可以使用 kakfa-client 版本 0.11.0.0 中的 AdminClient

示例代码:

    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhist:9091");

    AdminClient admin = AdminClient.create(config);
    ListTopicsResult listTopics = admin.listTopics();
    Set<String> names = listTopics.names().get();
    boolean contains = names.contains("TEST_6");
    if (!contains) {
        List<NewTopic> topicList = new ArrayList<NewTopic>();
        Map<String, String> configs = new HashMap<String, String>();
        int partitions = 5;
        Short replication = 1;
        NewTopic newTopic = new NewTopic("TEST_6", partitions, replication).configs(configs);
        topicList.add(newTopic);
        admin.createTopics(topicList);
    }
Run Code Online (Sandbox Code Playgroud)


C4s*_*tor 4

为此,您可以使用该方法,如果主题已经存在,AdminUtils.topicExists(ZkUtils zkClient, String topic)它将返回,否则返回。truefalse

那么你的代码将是这样的:

if (!AdminUtils.topicExists(zkClient, myTopic)){
    AdminUtils.createTopic(zkClient, myTopic, 2, 1, properties);
}
Run Code Online (Sandbox Code Playgroud)