Docker Kafka与Python消费者

Pan*_*boo 4 python apache-kafka docker kafka-consumer-api docker-compose

我正在使用dockerized Kafka并编写了一个Kafka消费者计划.当我在我的本地机器上运行Docker和应用程序中的Kafka时,它非常有效.但是当我在docker中配置本地应用程序时,我遇到了问题.问题可能是由于在应用程序启动时才创建主题.

泊坞窗,compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  parse-engine:
    build: .
    depends_on:
      - "kafka"
    command: python parse-engine.py
    ports:
     - "5000:5000"
Run Code Online (Sandbox Code Playgroud)

parse-engine.py

from kafka import KafkaConsumer
import json

try:
    print('Welcome to parse engine')
    consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')
    for message in consumer:
        print(message)
except Exception as e:
    print(e)
    # Logs the error appropriately. 
    pass
Run Code Online (Sandbox Code Playgroud)

错误日志

kafka_1         | [2018-09-21 06:27:17,400] INFO [SocketServer brokerId=1001] Started processors for 1 acceptors (kafka.network.SocketServer)
kafka_1         | [2018-09-21 06:27:17,404] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1         | [2018-09-21 06:27:17,404] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1         | [2018-09-21 06:27:17,431] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
**parse-engine_1  | Welcome to parse engine
parse-engine_1  | NoBrokersAvailable 
parseengine_parse-engine_1 exited with code 0**
kafka_1         | creating topics: test:1:1
Run Code Online (Sandbox Code Playgroud)

因为我已经在docker -compose中添加了depends_on属性,但在启动主题应用程序连接之前发生了错误.

我读到我可以在docker-compose文件中添加脚本,但我正在寻找一些简单的方法.

感谢帮助

Rob*_*att 30

你的问题是网络问题.在您的Kafka配置中,您正在设置

KAFKA_ADVERTISED_HOST_NAME: localhost
Run Code Online (Sandbox Code Playgroud)

但这意味着任何客户端(包括您的python应用程序)都将连接到代理,然后被代理告知localhost用于任何连接.由于来自客户端计算机的localhost(例如您的python容器)不在代理所在的位置,因此请求将失败.

您可以在此处详细了解Kafka听众:https://rmoff.net/2018/08/02/kafka-listeners-explained/

因此,要解决您的问题,您可以执行以下两项操作之一:

  1. 只需更改您的compose以使用Kafka()的内部主机名KAFKA_ADVERTISED_HOST_NAME: kafka.这意味着任何客户端的泊坞窗网络将能够精细访问它,但是没有外部的客户将能够(例如,从您的主机):

    version: '3'
    services:
    zookeeper:
        image: wurstmeister/zookeeper
        ports:
        - "2181:2181"
    kafka:
        image: wurstmeister/kafka
        ports:
        - "9092:9092"
        environment:
        KAFKA_ADVERTISED_HOST_NAME: kafka
        KAFKA_CREATE_TOPICS: "test:1:1"
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
        - /var/run/docker.sock:/var/run/docker.sock
    parse-engine:
        build: .
        depends_on:
        - "kafka"
        command: python parse-engine.py
        ports:
        - "5000:5000"
    
    Run Code Online (Sandbox Code Playgroud)

    然后您的客户将访问kafka:9092的代理,因此您的python应用程序将更改为

    consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')
    
    Run Code Online (Sandbox Code Playgroud)
  2. 向Kafka 添加一个新的监听器.这使它可以在docker网络的内部和外部访问.端口29092用于访问docker网络外部(例如,来自您的主机),以及9092用于内部访问.

    您仍然需要更改您的python程序以在正确的地址访问Kafka.在这种情况下,因为它是Docker网络的内部,你可以使用:

    consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')
    
    Run Code Online (Sandbox Code Playgroud)

    由于我不熟悉wurstmeister图像,这个docker-compose基于Confluent图像,我知道:

    (编辑器已经损坏了我的yaml,你可以在这里找到)

    ---
    version: '2'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:latest
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
    
      kafka:
        # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
        # An important note about accessing Kafka from clients on other machines: 
        # -----------------------------------------------------------------------
        #
        # The config used here exposes port 29092 for _external_ connections to the broker
        # i.e. those from _outside_ the docker network. This could be from the host machine
        # running docker, or maybe further afield if you've got a more complicated setup. 
        # If the latter is true, you will need to change the value 'localhost' in 
        # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those 
        # remote clients
        #
        # For connections _internal_ to the docker network, such as from other services
        # and components, use kafka:9092.
        #
        # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
        # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
        #
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - 29092:29092
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    
    Run Code Online (Sandbox Code Playgroud)

免责声明:我为Confluent工作

  • 使用 docker 主机的 linux 主机名(即 bash 中 `$(hostname)` 的值)而不是 `localhost` 来启用除 docker 主机之外的其他计算机上的客户端不是更好吗?(如您的参考博客文章中所示) (2认同)

cri*_*007 6

这条线

KAFKA_ADVERTISED_HOST_NAME: localhost
Run Code Online (Sandbox Code Playgroud)

说经纪人将自己宣传为仅在 上可用localhost,这意味着所有 Kafka 客户端只会取回自己,而不是真实经纪人地址的实际列表。如果您的客户端仅位于您的主机上,这会很好 - 请求始终转到 localhost,然后转发到容器

但是,对于其他容器中的应用程序,它们需要指向 Kafka 容器,所以应该说KAFKA_ADVERTISED_HOST_NAME: kafkakafka这里是 Docker Compose 服务的名称。然后其他容器中的客户端会尝试连接到那个容器


话虽如此,那么这条线

consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')
Run Code Online (Sandbox Code Playgroud)

您将 Python 容器指向自身,而不是kafka容器。

这应该说kafka:9092不是