我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序。出于这个原因,我想要一个 Kafka 消费者,当相关主题的流中有新消息时触发它,并通过将消息推回 Kafka 流来响应。
我一直在寻找类似 Spring 实现的东西:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
System.out.println("Received Messasge in group mygroup: " + message);
}
Run Code Online (Sandbox Code Playgroud)
我看过:
但是我在 Python 中找不到与事件驱动的实现风格相关的任何内容。
我正在尝试构建一个以 Kafka 作为界面的 Flask 应用程序。我使用了 Python 连接器kafka-python和用于 Kafka 的 Docker 映像spotify/kafkaproxy。
下面是 docker-compose 文件。
version: '3.3'
services:
kafka:
image: spotify/kafkaproxy
container_name: kafka_dev
ports:
- '9092:9092'
- '2181:2181'
environment:
- ADVERTISED_HOST=0.0.0.0
- ADVERTISED_PORT=9092
- CONSUMER_THREADS=1
- TOPICS=PROFILE_CREATED,IMG_RATED
- ZK_CONNECT=kafka7zookeeper:2181/root/path
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
depends_on:
- kafka
Run Code Online (Sandbox Code Playgroud)
下面是我用来连接到 kafka 的 Python 片段。在这里,我使用 Kafka 容器的别名kafka
进行连接,因为 Docker 会负责将别名映射到它的 IP 地址。
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = …
Run Code Online (Sandbox Code Playgroud)