将 .NET 应用程序连接到在 docker 中运行的 kafka

Ave*_*mer 2 c# apache-kafka docker

我在 docker 中运行 Kafka,并且有一个 .NET 应用程序,我想用它来消费消息。我遵循了以下教程,但没有运气:
https://www.confluence.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
连接到在 Docker 中运行的 Kafka
从 docker 主机外部与 kafka docker 容器交互
在我的消费者应用程序上,如果我尝试直接连接到容器 ip,则会收到以下错误:

172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21502ms in state CONNECT)
Error: 1/1 brokers are down %3|1620652406.633|FAIL|rdkafka#consumer-1| [thrd:172.21.0.3:9092/bootstrap]: 172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21037ms in state CONNECT, 1 identical error(s) suppressed) 
Error: 172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21037ms in state CONNECT, 1 identical error(s) suppressed)
Run Code Online (Sandbox Code Playgroud)

如果我将 BootstrapServers 更改为 kafka:9092 我会收到此错误:

Error: kafka:9092/bootstrap: Failed to resolve 'kafka:9092': No such host is known.  (after 6817ms in state CONNECT, 7 identical error(s) suppressed)
Run Code Online (Sandbox Code Playgroud)

我的码头工人组成:

version: '3.8'
services:
  zookeeper:
    #image: "debezium/zookeeper:${DEBEZIUM_VERSION}"
    image: "confluentinc/cp-zookeeper:5.5.0"
  #  ports:
  #    - 2181:2181
  #    - 2888:2888
  #    - 3888:3888
    ports:
      - 2181:2181
    environment: 
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    #image: "debezium/kafka:${DEBEZIUM_VERSION}"
    image: "confluentinc/cp-kafka"
    ports:
      - 9092:9092
      #- 29092:29092
    depends_on:
      - zookeeper
    environment:
     - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
     - KAFKA_BROKERID=1
     - ALLOW_PLAINTEXT_LISTENER="yes"
     - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
     - KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
     - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
     - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

  connect:
    image: "debezium/connect:${DEBEZIUM_VERSION}"
    ports: 
      - 8083:8083
    depends_on:
      - kafka
      - zookeeper
    environment:
      - BOOTSTRAP_SERVERS=kafka:29092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_source_connect_statuses

  kafdrop:
    image: "obsidiandynamics/kafdrop"
    ports:
      - 9000:9000
    depends_on: 
      - connect
    environment: 
      - KAFKA_BROKERCONNECT=kafka:29092
Run Code Online (Sandbox Code Playgroud)

和 C# 代码:

 var config = new ConsumerConfig
            {
                BootstrapServers = "kafka:9092",
                GroupId = "simple-dotnet-consumer",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnablePartitionEof = true
            };
            using var consumer = new ConsumerBuilder<string, string>(config)
                .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                .Build();

            consumer.Subscribe(new List<string>() { "DESKTOP-DBA3LAO.dbo.CashRegister" });
            var start = DateTime.Now;
            long messageCounter = 0;
            try
            {
                while (!(Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Q))
                {
                    var result = consumer.Consume(TimeSpan.FromMilliseconds(100));
                    if (result == null) { continue; }
                    if (result.IsPartitionEOF) { break; }

                    ++messageCounter;
                    if (messageCounter % 1024 == 0) { Console.WriteLine($"Received message key: \"{result.Message.Key}\" value: {result.Message.Value}"); }
                }
            }
            catch (OperationCanceledException) { }

            consumer.Close();  // commit offset and unsubscribe

            var elapsed = DateTime.Now - start;
            Console.WriteLine("average throughput: {0:N3} msg/sec, {1} messages over {2:N3} sec", messageCounter / elapsed.TotalSeconds, messageCounter, elapsed.TotalSeconds);
        
Run Code Online (Sandbox Code Playgroud)

Rob*_*att 6

您需要正确配置侦听器。目前,您将代理宣传为只能通过主机名访问,kafka正如 @mm8 所说,这仅在 Docker 网络中有效。

     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
     - KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
Run Code Online (Sandbox Code Playgroud)

您需要有一个侦听器,它将自己宣传为 localhost(或您的代码能够连接到它的任何主机名),例如:

     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
     - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
Run Code Online (Sandbox Code Playgroud)

现在修改您的客户端应用程序以使用localhost而不是kafka

BootstrapServers = "localhost:9092",
Run Code Online (Sandbox Code Playgroud)

如果你仔细阅读的话,这些都包含在我的博客中:)