使用Java为Apache Kafka 0.9创建主题

Che*_*oon 4 java apache-kafka

我正在编写一个客户端来使用kafka 0.9.我想知道如何创建一个主题.答案:如何通过Java在Kafka中创建主题与我的要求类似.除此之外,该解决方案仅适用于Kafka 0.8.2,这与Kafka 0.9的API有很大不同.

Avi*_*Avi 9

我尝试跟随Soon Chee Loong对Kafka 0.9.0.1的回答,但不得不做出一个改变.ZKStringSerializer现在是私有的.要创建ZkUtils,我使用了以下API(它在内部创建了一个ZkClient):

ZkUtils.apply(
    "zookeeper1:port1,zookeeper2:port2",
    sessionTimeoutMs,
    connectionTimeoutMs,
    false)
Run Code Online (Sandbox Code Playgroud)


Che*_*oon 8

通过scala api和各种链接在线查看.

这是我找到的解决方案:

Maven依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

码:

import java.util.Properties;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

    public static void main(String[] args) {
        String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
        int sessionTimeoutMs = 10 * 1000;
        int connectionTimeoutMs = 8 * 1000;

        ZkClient zkClient = new ZkClient(
            zookeeperConnect,
            sessionTimeoutMs,
            connectionTimeoutMs,
            ZKStringSerializer$.MODULE$);

       // Security for Kafka was added in Kafka 0.9.0.0
       boolean isSecureKafkaCluster = false;
       // ZkUtils for Kafka was used in Kafka 0.9.0.0 for the AdminUtils API
       ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

       String topic = "my-topic";
       int partitions = 2;
       int replication = 3;

       // Add topic configuration here
       Properties topicConfig = new Properties();

       AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
       zkClient.close();
    }
}
Run Code Online (Sandbox Code Playgroud)

如果您想知道为什么下面的代码看起来不像Java:

ZKStringSerializer$.MODULE$
Run Code Online (Sandbox Code Playgroud)

这是因为ZkStringSerializer是一个Scala对象.您可以在这里阅读更多相关信息:

如何用Java创建Kafka ZKStringSerializer?

注意:您必须使用ZKStringSerializer初始化ZkClient.
如果你不这样做,那么createTopic()似乎只能工作(换句话说:它将返回而不会出错).
该主题仅存在于Zookeeper中,仅在列出主题时有效.即下面的列表命令工作正常

bin/kafka-topics.sh --list --zookeeper localhost:2181
Run Code Online (Sandbox Code Playgroud)

但卡夫卡本身并没有创造这个话题.为了说明,下面的describe命令将引发错误.

bin/kafka-topics.sh --describe --zookeeper localhost:2181
Run Code Online (Sandbox Code Playgroud)

因此,请确保使用ZKStringSerializer $ .MODULE $初始化它.

参考资料: 我们如何使用来自-ide-using-api的APIIDE在Kafka中创建主题

不久,多伦多大学的Chee Loong