我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题.如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它工作正常.但我想通过java api创建一个主题.经过长时间的搜索,我发现下面的代码,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
Run Code Online (Sandbox Code Playgroud)
我尝试了上面的代码,它显示主题已创建,但我无法在主题中推送消息.我的代码中有什么问题吗?或者以其他任何方式实现上述目标?
我正在尝试构建一个以 Kafka 作为界面的 Flask 应用程序。我使用了 Python 连接器kafka-python和用于 Kafka 的 Docker 映像spotify/kafkaproxy。
下面是 docker-compose 文件。
version: '3.3'
services:
kafka:
image: spotify/kafkaproxy
container_name: kafka_dev
ports:
- '9092:9092'
- '2181:2181'
environment:
- ADVERTISED_HOST=0.0.0.0
- ADVERTISED_PORT=9092
- CONSUMER_THREADS=1
- TOPICS=PROFILE_CREATED,IMG_RATED
- ZK_CONNECT=kafka7zookeeper:2181/root/path
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
depends_on:
- kafka
Run Code Online (Sandbox Code Playgroud)
下面是我用来连接到 kafka 的 Python 片段。在这里,我使用 Kafka 容器的别名kafka进行连接,因为 Docker 会负责将别名映射到它的 IP 地址。
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = …Run Code Online (Sandbox Code Playgroud)