Pan*_*boo 4 python apache-kafka docker kafka-consumer-api docker-compose
我正在使用dockerized Kafka并编写了一个Kafka消费者计划.当我在我的本地机器上运行Docker和应用程序中的Kafka时,它非常有效.但是当我在docker中配置本地应用程序时,我遇到了问题.问题可能是由于在应用程序启动时才创建主题.
泊坞窗,compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
parse-engine:
build: .
depends_on:
- "kafka"
command: python parse-engine.py
ports:
- "5000:5000"
Run Code Online (Sandbox Code Playgroud)
parse-engine.py
from kafka import KafkaConsumer
import json
try:
print('Welcome to parse engine')
consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')
for message in consumer:
print(message)
except Exception as e:
print(e)
# Logs the error appropriately.
pass
Run Code Online (Sandbox Code Playgroud)
错误日志
kafka_1 | [2018-09-21 06:27:17,400] INFO [SocketServer brokerId=1001] Started processors for 1 acceptors (kafka.network.SocketServer)
kafka_1 | [2018-09-21 06:27:17,404] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2018-09-21 06:27:17,404] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2018-09-21 06:27:17,431] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
**parse-engine_1 | Welcome to parse engine
parse-engine_1 | NoBrokersAvailable
parseengine_parse-engine_1 exited with code 0**
kafka_1 | creating topics: test:1:1
Run Code Online (Sandbox Code Playgroud)
因为我已经在docker -compose中添加了depends_on属性,但在启动主题应用程序连接之前发生了错误.
我读到我可以在docker-compose文件中添加脚本,但我正在寻找一些简单的方法.
感谢帮助
Rob*_*att 30
你的问题是网络问题.在您的Kafka配置中,您正在设置
KAFKA_ADVERTISED_HOST_NAME: localhost
Run Code Online (Sandbox Code Playgroud)
但这意味着任何客户端(包括您的python应用程序)都将连接到代理,然后被代理告知localhost用于任何连接.由于来自客户端计算机的localhost(例如您的python容器)不在代理所在的位置,因此请求将失败.
您可以在此处详细了解Kafka听众:https://rmoff.net/2018/08/02/kafka-listeners-explained/
因此,要解决您的问题,您可以执行以下两项操作之一:
只需更改您的compose以使用Kafka()的内部主机名KAFKA_ADVERTISED_HOST_NAME: kafka.这意味着任何客户端内的泊坞窗网络将能够精细访问它,但是没有外部的客户将能够(例如,从您的主机):
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
parse-engine:
build: .
depends_on:
- "kafka"
command: python parse-engine.py
ports:
- "5000:5000"
Run Code Online (Sandbox Code Playgroud)
然后您的客户将访问kafka:9092的代理,因此您的python应用程序将更改为
consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')
Run Code Online (Sandbox Code Playgroud)向Kafka 添加一个新的监听器.这使它可以在docker网络的内部和外部访问.端口29092用于访问docker网络外部(例如,来自您的主机),以及9092用于内部访问.
您仍然需要更改您的python程序以在正确的地址访问Kafka.在这种情况下,因为它是Docker网络的内部,你可以使用:
consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')
Run Code Online (Sandbox Code Playgroud)
由于我不熟悉wurstmeister图像,这个docker-compose基于Confluent图像,我知道:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
# An important note about accessing Kafka from clients on other machines:
# -----------------------------------------------------------------------
#
# The config used here exposes port 29092 for _external_ connections to the broker
# i.e. those from _outside_ the docker network. This could be from the host machine
# running docker, or maybe further afield if you've got a more complicated setup.
# If the latter is true, you will need to change the value 'localhost' in
# KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
# remote clients
#
# For connections _internal_ to the docker network, such as from other services
# and components, use kafka:9092.
#
# See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
#
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Run Code Online (Sandbox Code Playgroud)免责声明:我为Confluent工作
这条线
KAFKA_ADVERTISED_HOST_NAME: localhost
Run Code Online (Sandbox Code Playgroud)
说经纪人将自己宣传为仅在 上可用localhost,这意味着所有 Kafka 客户端只会取回自己,而不是真实经纪人地址的实际列表。如果您的客户端仅位于您的主机上,这会很好 - 请求始终转到 localhost,然后转发到容器。
但是,对于其他容器中的应用程序,它们需要指向 Kafka 容器,所以应该说KAFKA_ADVERTISED_HOST_NAME: kafka,kafka这里是 Docker Compose 服务的名称。然后其他容器中的客户端会尝试连接到那个容器
话虽如此,那么这条线
consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')
Run Code Online (Sandbox Code Playgroud)
您将 Python 容器指向自身,而不是kafka容器。
这应该说kafka:9092不是
| 归档时间: |
|
| 查看次数: |
3936 次 |
| 最近记录: |