我们如何使用API​​从IDE在Kafka中创建主题

ram*_*amu 49 java apache-kafka

我们如何使用API​​从IDE在Kafka中创建主题,因为当我这样做时:

bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181
Run Code Online (Sandbox Code Playgroud)

我收到错误:

bash: bin/kafka-create-topic.sh: No such file or directory
Run Code Online (Sandbox Code Playgroud)

我按照原样关注了开发人员设置.

Mic*_*oll 72

在Kafka 0.8.1+ - 截至今天的Kafka的最新版本 - 您可以通过编程方式创建一个新主题AdminCommand.CreateTopicCommand此问题的先前答案之一中提到的(旧Kafka 0.8.0的一部分)的功能已移至AdminCommand.

Kafka 0.8.1的Scala示例:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
// Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
// createTopic() will only seem to work (it will return without error).  The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,
    ZKStringSerializer)

// Create a topic named "myTopic" with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)
Run Code Online (Sandbox Code Playgroud)

使用sbt构建依赖关系,例如:

libraryDependencies ++= Seq(
  "com.101tec" % "zkclient" % "0.4",
  "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
    exclude("javax.jms", "jms")
    exclude("com.sun.jdmk", "jmxtools")
    exclude("com.sun.jmx", "jmxri"),
  ...
)
Run Code Online (Sandbox Code Playgroud)

编辑:为Kafka 0.9.0.0添加了Java示例(截至2016年1月的最新版本).

Maven依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

码:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;
    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;
    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
    zkClient.close();
  }

}
Run Code Online (Sandbox Code Playgroud)

编辑2:为Kafka 0.10.2.0添加了Java示例(截至2017年4月的最新版本).

Maven依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.9</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

码:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here

    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;

    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
    zkClient.close();
  }

}
Run Code Online (Sandbox Code Playgroud)

  • 请注意,在未使用ZKStringSerializer初始化ZkClient时,createTopic将返回而不会出现错误.该主题将存在于zookeeper中,并在列出主题时返回,但Kafka本身不会创建该主题. (7认同)

Dmi*_*sky 34

从0.11.0.0开始,您只需要:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

此工件现在包含AdminClient(org.apache.kafka.clients.admin).

AdminClient 可以处理许多Kafka管理任务,包括创建主题:

Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");

AdminClient admin = AdminClient.create(config);

Map<String, String> configs = new HashMap<>();
int partitions = 1;
int replication = 1;

admin.createTopics(asList(new NewTopic("topic", partitions, replication).configs(configs)));
Run Code Online (Sandbox Code Playgroud)

此命令的输出是a CreateTopicsResult,您可以使用它来获取Future整个操作或每个主题创建:

  • 为了获得整个运营的未来,请使用CreateTopicsResult#all().
  • Future单独获取所有主题的s,请使用CreateTopicsResult#values().

例如:

CreateTopicsResult result = ...
KafkaFuture<Void> all = result.all();
Run Code Online (Sandbox Code Playgroud)

要么:

CreateTopicsResult result = ...
for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
    try {
        entry.getValue().get();
        log.info("topic {} created", entry.getKey());
    } catch (InterruptedException | ExecutionException e) {
        if (Throwables.getRootCause(e) instanceof TopicExistsException) {
            log.info("topic {} existed", entry.getKey());
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

KafkaFuture是"一个支持调用链和其他异步编程模式的灵活未来",并且"最终将成为Java 8之上的一个薄薄的垫片CompletebleFuture".


Jay*_*ram 14

要通过java api和Kafka 0.8+创建主题,请尝试以下方法,

首先导入以下声明

import kafka.utils.ZKStringSerializer$;
Run Code Online (Sandbox Code Playgroud)

以下列方式为ZkClient创建对象,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
Run Code Online (Sandbox Code Playgroud)


小智 10

您可以尝试使用kafka.admin.CreateTopicCommand scala类从Java代码创建主题...提供必要的参数.

String [] arguments = new String[8];
arguments[0] = "--zookeeper";
arguments[1] = "10.***.***.***:2181";
arguments[2] = "--replica";
arguments[3] = "1";
arguments[4] = "--partition";
arguments[5] = "1";
arguments[6] = "--topic";
arguments[7] = "test-topic-Biks";

CreateTopicCommand.main(arguments);
Run Code Online (Sandbox Code Playgroud)

注意:您应该为jopt-simple-4.5&添加maven依赖项zkclient-0.1


For*_*ner 6

基于最新的kafka-client api和Kafka 2.1.1,工作版本代码如下:

使用 sbt 导入最新的 kafka 客户端。

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += Seq("org.apache.kafka" % "kafka-clients" % "2.1.1",
"org.apache.kafka" %% "kafka" % "2.1.1")
Run Code Online (Sandbox Code Playgroud)

scala中创建主题的代码:

import java.util.Arrays
import java.util.Properties

import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}

class CreateKafkaTopic {
  def create(): Unit = {
    val config = new Properties()
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.30.1.5:9092")

    val localKafkaAdmin = AdminClient.create(config)

    val partitions = 3
    val replication = 1.toShort
    val topic = new NewTopic("integration-02", partitions, replication)
    val topics = Arrays.asList(topic)

    val topicStatus = localKafkaAdmin.createTopics(topics).values()
    //topicStatus.values()
    println(topicStatus.keySet())
  }

}
Run Code Online (Sandbox Code Playgroud)

使用以下方法验证新主题:

./kafka-topics.sh --zookeeper 192.30.1.5:2181 --list
Run Code Online (Sandbox Code Playgroud)

希望它对某人有帮助。参考:http : //kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html


San*_*ket 0

您正在尝试使用哪个 IDE?

请提供完整路径,以下是来自终端的命令,该命令将创建主题

  1. cd kafka/bin
  2. ./kafka-create-topic.sh --topic test --zookeeper localhost:2181