我正在编写一个客户端来使用kafka 0.9.我想知道如何创建一个主题.答案:如何通过Java在Kafka中创建主题与我的要求类似.除此之外,该解决方案仅适用于Kafka 0.8.2,这与Kafka 0.9的API有很大不同.
我尝试跟随Soon Chee Loong对Kafka 0.9.0.1的回答,但不得不做出一个改变.ZKStringSerializer现在是私有的.要创建ZkUtils,我使用了以下API(它在内部创建了一个ZkClient):
ZkUtils.apply(
"zookeeper1:port1,zookeeper2:port2",
sessionTimeoutMs,
connectionTimeoutMs,
false)
Run Code Online (Sandbox Code Playgroud)
通过scala api和各种链接在线查看.
这是我找到的解决方案:
Maven依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
码:
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
public class KafkaJavaExample {
public static void main(String[] args) {
String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
ZkClient zkClient = new ZkClient(
zookeeperConnect,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
// Security for Kafka was added in Kafka 0.9.0.0
boolean isSecureKafkaCluster = false;
// ZkUtils for Kafka was used in Kafka 0.9.0.0 for the AdminUtils API
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
String topic = "my-topic";
int partitions = 2;
int replication = 3;
// Add topic configuration here
Properties topicConfig = new Properties();
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
zkClient.close();
}
}
Run Code Online (Sandbox Code Playgroud)
如果您想知道为什么下面的代码看起来不像Java:
ZKStringSerializer$.MODULE$
Run Code Online (Sandbox Code Playgroud)
这是因为ZkStringSerializer是一个Scala对象.您可以在这里阅读更多相关信息:
如何用Java创建Kafka ZKStringSerializer?
注意:您必须使用ZKStringSerializer初始化ZkClient.
如果你不这样做,那么createTopic()似乎只能工作(换句话说:它将返回而不会出错).
该主题仅存在于Zookeeper中,仅在列出主题时有效.即下面的列表命令工作正常
bin/kafka-topics.sh --list --zookeeper localhost:2181
Run Code Online (Sandbox Code Playgroud)
但卡夫卡本身并没有创造这个话题.为了说明,下面的describe命令将引发错误.
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Run Code Online (Sandbox Code Playgroud)
因此,请确保使用ZKStringSerializer $ .MODULE $初始化它.
参考资料: 我们如何使用来自-ide-using-api的API从IDE在Kafka中创建主题
不久,多伦多大学的Chee Loong
| 归档时间: |
|
| 查看次数: |
6519 次 |
| 最近记录: |