Docker中的Kafka无法正常工作

nan*_*nue 47 apache-kafka docker docker-compose kafka-python

我正在尝试使用wurstmeister\kafka-docker图像 docker-compose,但我在连接所有内容方面遇到了实际问题.

我检查的所有帖子或问题,似乎没有任何问题,但我坦率地失去了.(并且在SO中至少有两个问题试图解决这个问题)

我认为问题在于我对网络的了解不足docker.所以问题是:

我可以从同一个kafka容器中使用和生产,但是,当我尝试创建另一个容器(或者使用我的笔记本电脑和python客户端)时,我得到了几个与advertised.host.name参数相关的错误(在图像中这个参数是KAFKA_ADVERTISED_HOST_NAME)

我已经尝试过多种方式设置这个变量,但它根本不起作用.

所以我正在寻找一个明确的答案(即如何自动设置这些参数及其含义)如何设置 docker-compose.yml

这是我的:

zookeeper:
  image: wurstmeister/zookeeper
  ports:
    - "2181:2181"

kafka:
  image: wurstmeister/kafka
 # hostname: kafka
  ports:
    - "9092"
  links:
    - zookeeper:zk
  environment:
    KAFKA_ADVERTISED_HOST_NAME: "kafka"
    KAFKA_ADVERTISED_PORT: "9092"
    KAFKA_ZOOKEEPER_CONNECT: "zk:2181"
Run Code Online (Sandbox Code Playgroud)

UPDATE

按照@dnephin的建议,我修改start-kafka.sh了以下几行:

...
if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then
    export KAFKA_ADVERTISED_PORT=$(hostname -i)
fi
...
Run Code Online (Sandbox Code Playgroud)

KAFKA_ADVERTISED_HOST_NAME: "kafka"从中删除docker-compose.yml

我以规范的方式启动了容器:

docker-compose up -d
Run Code Online (Sandbox Code Playgroud)

两个容器都在运行:

$ docker-compose ps
           Name                          Command               State                     Ports                    
-----------------------------------------------------------------------------------------------------------------
infraestructura_kafka_1       start-kafka.sh                   Up      0.0.0.0:32768->9092/tcp                    
infraestructura_zookeeper_1   /opt/zookeeper/bin/zkServe ...   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp 
Run Code Online (Sandbox Code Playgroud)

后来我做了:

docker-compose logs
Run Code Online (Sandbox Code Playgroud)

一切顺利.

用于检查IP地址:

$ KAFKA_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' infraestructura_kafka_1)                                                                                                            
$ echo $KAFKA_IP
172.17.0.4

and

$ ZK_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' infraestructura_zookeeper_1)                                                                                                           
$ echo $ZK_IP 
172.17.0.3
Run Code Online (Sandbox Code Playgroud)

然后我执行两个不同的控制台:

制片人:

$ docker run --rm --interactive wurstmeister/kafka /opt/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --topic grillo --broker-list 171.17.0.4:9092  
Run Code Online (Sandbox Code Playgroud)

消费者:

$ docker run --rm --interactive  wurstmeister/kafka /opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --topic grillo --from-beginning --zookeeper 172.17.0.3:2181 
Run Code Online (Sandbox Code Playgroud)

几乎立即,警告开始在整个屏幕上飞行:

[2016-03-11 00:39:17,010] WARN Fetching topic metadata with correlation id 0 for topics [Set(grillo)] from broker [BrokerEndPoint(1001,ba53d4fd7595,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-03-11 00:39:17,013] WARN [console-consumer-79688_9dd5f575d557-1457656747003-f1ed369d-leader-finder-thread], Failed to find leader for Set([grillo,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFin
derThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(grillo)] from broker [ArrayBuffer(BrokerEndPoint(1001,ba53d4fd7595,9092))] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        ... 3 more
Run Code Online (Sandbox Code Playgroud)

等等

在制作人的控制台中,我写了一些句子:

$ docker run --rm --interactive klustera/kafka /opt/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --topic grillo --broker-list 171.17.0.4:9092                                                           
Hola
¿Cómo está??
¿Todo bien?
Run Code Online (Sandbox Code Playgroud)

过了一会儿,我收到了这样的答复:

[2016-03-11 00:39:28,955] ERROR Error when sending message to topic grillo with key: null, value: 4 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2016-03-11 00:40:28,956] ERROR Error when sending message to topic grillo with key: null, value: 16 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2016-03-11 00:41:28,956] ERROR Error when sending message to topic grillo with key: null, value: 12 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
Run Code Online (Sandbox Code Playgroud)

并在 docker-compose logs

...
zookeeper_1 | 2016-03-11 00:39:07,072 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x153631368b1000b type:create c
xid:0x2 zxid:0x47 txntype:-1 reqpath:n/a Error Path:/consumers Error:KeeperErrorCode = NodeExists for /consumers
zookeeper_1 | 2016-03-11 00:39:07,243 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x153631368b1000b type:create c
xid:0x19 zxid:0x4b txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-79688/owners/grillo Error:KeeperErrorCode = NoNode for /consumers/console-consumer-79688/owners/grillo
zookeeper_1 | 2016-03-11 00:39:07,247 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x153631368b1000b type:create $xid:0x1a zxid:0x4c txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-79688/owners Error:KeeperErrorCode = NoNode for /consumers/console-consumer-79688/owners
...
Run Code Online (Sandbox Code Playgroud)

更新2

我至少在docker-machine以下方面做到了:

首先,我定义了一个名为的变量docker-machine:

DOCKER_VM=kafka_test
Run Code Online (Sandbox Code Playgroud)

然后,我修改docker-compose.yml如下:

KAFKA_ADVERTISED_HOST_NAME: "${DOCKER_MACHINE_IP}"
Run Code Online (Sandbox Code Playgroud)

最后,在环境中docker-machine,我执行:

DOCKER_MACHINE_IP=$(docker-machine ip $DOCKER_VM) docker-compose up -d
Run Code Online (Sandbox Code Playgroud)

但是在笔记本电脑中(我的意思是,不使用虚拟机,它不起作用)

rad*_*1st 24

我对这个问题的解决方案略有不同.我卡夫卡配置上做广告kafka主机和,因为它是在主机上暴露localhost:9092,我在添加一个条目/etc/hosts用于kafka解析到localhost.通过这样做,可以从其他Docker容器和localhost访问Kafka.

泊坞窗,compose.yml:

  my-web-service:
    build: ./my-web-service
    ports:
     - "8000:8000"
    links:
     - kafka
  kafka:
    image: "wurstmeister/kafka:0.10.2.0"
    ports:
     - "9092:9092"
    hostname: kafka
    links: 
     - zookeeper
    environment:
     - KAFKA_ADVERTISED_HOST_NAME=kafka
     - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_ADVERTISED_PORT=9092
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
Run Code Online (Sandbox Code Playgroud)

更新的主机文件:

more /etc/hosts
127.0.0.1       localhost kafka
Run Code Online (Sandbox Code Playgroud)


Sha*_*kil 6

要在localhost中开发应用程序,文档中有一个解决方案:"HOSTNAME_COMMAND"

kafka:
  image: wurstmeister/kafka
  ports:
    - 9092:9092
environment:
  KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
Run Code Online (Sandbox Code Playgroud)

希望这有助于他人......


dne*_*hin 5

我相信您使用的值KAFKA_ADVERTISED_HOST_NAME会根据容器的到达方式而变化。

如果您尝试从另一个容器进行连接,则 usingkafka应该是正确的(只要您使用将该名称设置为链接别名)。

如果您尝试从主机进行连接,则该名称将不起作用。您需要使用容器 IP 地址,您可以使用docker inspect. 但是,容器 IP 地址会发生变化,因此最好使用$(hostname -i)检索它的方法从容器内部设置它。