在 dockerized 环境中无法从 Flask 连接到 Kafka

Sha*_*ank 5 python connection flask apache-kafka docker

我正在尝试构建一个以 Kafka 作为界面的 Flask 应用程序。我使用了 Python 连接器kafka-python和用于 Kafka 的 Docker 映像spotify/kafkaproxy

下面是 docker-compose 文件。

version: '3.3'
services:
  kafka:
    image: spotify/kafkaproxy
    container_name: kafka_dev
    ports:
      - '9092:9092'
      - '2181:2181'
    environment:
      - ADVERTISED_HOST=0.0.0.0
      - ADVERTISED_PORT=9092
      - CONSUMER_THREADS=1
      - TOPICS=PROFILE_CREATED,IMG_RATED
      - ZK_CONNECT=kafka7zookeeper:2181/root/path
  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - '9000:5000'
    volumes:
      - ./flask-app:/app
    depends_on:
      - kafka
Run Code Online (Sandbox Code Playgroud)

下面是我用来连接到 kafka 的 Python 片段。在这里,我使用 Kafka 容器的别名kafka进行连接,因为 Docker 会负责将别名映射到它的 IP 地址。

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
Run Code Online (Sandbox Code Playgroud)

我有NoBrokersAvailable错误。由此,我可以理解 Flask 应用程序找不到 Kafka 服务器。

Traceback (most recent call last):
  File "./app.py", line 11, in <module>
    consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
Run Code Online (Sandbox Code Playgroud)

其他观察:

  1. 我能够ping kafka从 Flask 容器运行并从 Kafka 容器获取数据包。
  2. 当我在本地运行 Flask 应用程序时,尝试通过设置连接到 Kafka 容器BOOTSTRAP_SERVERS = ['localhost:9092'],它工作正常。

llo*_*ono 1

更新

正如 cricket_007 所提到的,鉴于您正在使用下面提供的 docker-compose,您应该使用它kafka:29092从另一个容器连接到 Kafka。所以你的代码看起来像这样:

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
Run Code Online (Sandbox Code Playgroud)

结束更新

我建议您使用Confluence Inc的 Kafka 图像,他们有各种使用 docker-compose 的示例设置,可供使用,并且总是在更新它们。

试试这个:

---
version: '2'
services:
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
    - zookeeper
    ports:
    - 9092:9092
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
    - '9000:5000'
    volumes:
    - ./flask-app:/app
Run Code Online (Sandbox Code Playgroud)

我使用了这个docker-compose.yml并在顶部添加了您的服务请注意:

这里使用的配置公开了端口 9092,用于与代理的外部连接,即来自docker 网络外部的连接。这可能来自运行 docker 的主机,或者如果您有更复杂的设置,则可能来自更远的地方。如果后者为真,您将需要将 KAFKA_ADVERTISED_LISTENERS 中的值“localhost”更改为可从这些远程客户端解析为 docker 主机的值

确保您查看其他示例,可能对您有用,尤其是在迁移到生产环境时:https://github.com/confluenceinc/cp-docker-images/tree/5.0.1-post/examples

还值得检查:

看来您需要指定 api_version 以避免此错误。欲了解更多详情,请查看此处

该库的 1.3.5 版本(pypy 上的最新版本)仅列出了某些 API 版本 0.8.0 到 0.10.1。因此,除非您明确指定 api_version 为 (0, 10, 1),否则客户端库尝试发现版本将导致 NoBrokersAvailable 错误。

producer = KafkaProducer(
    bootstrap_servers=URL,
    client_id=CLIENT_ID,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)
Run Code Online (Sandbox Code Playgroud)

这应该可行,有趣的是,设置 api_version 意外地修复了问题:

当您设置 api_version 时,客户端将不会尝试探测代理以获取版本信息。因此,探测操作失败了。版本探测连接和一般连接之间的一个很大的区别是,前者仅尝试在每个连接(每个代理)的单个接口上进行连接,而后者(一般操作)将不断循环所有接口,直到连接为止成功了。#1411 通过切换版本探测逻辑以尝试在所有找到的接口上进行连接来修复此问题。

实际问题描述如下