Tho*_*ter 4 python bash apache-kafka
我想创建一个Kafka主题,如果它尚不存在.我知道如何通过bash创建一个主题,但我不知道如何检查它是否存在.
topic_exists = ??????
if not topic_exists:
subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--create',
'--zookeeper', '{}:2181'.format(KAFKAHOST),
'--topic', str(self.topic),
'--partitions', str(self.partitions),
'--replication-factor', str(self.replication_factor)])
Run Code Online (Sandbox Code Playgroud)
另一个不错的方法是使用python kafka模块:
kafka_client = kafka.KafkaClient(kafka_server_name)
server_topics = kafka_client.topic_partitions
if topic_name in server_topics:
your code....
Run Code Online (Sandbox Code Playgroud)
kafka_client.topic_partitions返回主题列表。
您可以使用该--list (List all available topics)选项kafka-topics.sh并查看是否self.topic存在于topics数组中,如下所示.
根据您拥有的主题数量,这种方法可能会有点沉重.如果是这种情况,您可能可以使用--describe (List details for the given topics),如果主题不存在,可能会返回空.我没有对此进行彻底的测试,所以我不能肯定这个解决方案(--describe)是多么可靠,但是你可能需要进一步研究它.
wanted_topics = ['host_updates_queue', 'foo_bar']
topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--list',
'--zookeeper', '{}:2181'.format(KAFKAHOST)])
for wanted in wanted_topics:
if wanted in topics:
print '\'{}\' topic exists!'.format(wanted)
else:
print '\'{}\' topic does NOT exist!'.format(wanted)
topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--describe',
'--topic', wanted,
'--zookeeper', '{}:2181'.format(KAFKAHOST)])
if not topic_desc:
print 'No description found for the topic \'{}\''.format(wanted)
Run Code Online (Sandbox Code Playgroud)
OUTPUT:
root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'
Run Code Online (Sandbox Code Playgroud)
还有一个Broker配置可用,因此您无需执行以下任何步骤:
auto.create.topics.enable | 是的| 在服务器上启用主题的自动创建.如果将此设置为true,则尝试为不存在的主题生成数据或获取元数据将自动使用默认复制因子和分区数创建它.
如果可能的话,我会采用这种方法.
请注意,您应server.properties在代理上设置主题configs()num.partitions,default.replication.factor以匹配代码段中的设置.
| 归档时间: |
|
| 查看次数: |
4212 次 |
| 最近记录: |