Ave*_*mer 2 c# apache-kafka docker
我在 docker 中运行 Kafka,并且有一个 .NET 应用程序,我想用它来消费消息。我遵循了以下教程,但没有运气:
https://www.confluence.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
连接到在 Docker 中运行的 Kafka
从 docker 主机外部与 kafka docker 容器交互
在我的消费者应用程序上,如果我尝试直接连接到容器 ip,则会收到以下错误:
172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21502ms in state CONNECT)
Error: 1/1 brokers are down %3|1620652406.633|FAIL|rdkafka#consumer-1| [thrd:172.21.0.3:9092/bootstrap]: 172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21037ms in state CONNECT, 1 identical error(s) suppressed)
Error: 172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21037ms in state CONNECT, 1 identical error(s) suppressed)
Run Code Online (Sandbox Code Playgroud)
如果我将 BootstrapServers 更改为 kafka:9092 我会收到此错误:
Error: kafka:9092/bootstrap: Failed to resolve 'kafka:9092': No such host is known. (after 6817ms in state CONNECT, 7 identical error(s) suppressed)
Run Code Online (Sandbox Code Playgroud)
我的码头工人组成:
version: '3.8'
services:
zookeeper:
#image: "debezium/zookeeper:${DEBEZIUM_VERSION}"
image: "confluentinc/cp-zookeeper:5.5.0"
# ports:
# - 2181:2181
# - 2888:2888
# - 3888:3888
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
#image: "debezium/kafka:${DEBEZIUM_VERSION}"
image: "confluentinc/cp-kafka"
ports:
- 9092:9092
#- 29092:29092
depends_on:
- zookeeper
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKERID=1
- ALLOW_PLAINTEXT_LISTENER="yes"
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
- KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
connect:
image: "debezium/connect:${DEBEZIUM_VERSION}"
ports:
- 8083:8083
depends_on:
- kafka
- zookeeper
environment:
- BOOTSTRAP_SERVERS=kafka:29092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
kafdrop:
image: "obsidiandynamics/kafdrop"
ports:
- 9000:9000
depends_on:
- connect
environment:
- KAFKA_BROKERCONNECT=kafka:29092
Run Code Online (Sandbox Code Playgroud)
和 C# 代码:
var config = new ConsumerConfig
{
BootstrapServers = "kafka:9092",
GroupId = "simple-dotnet-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
};
using var consumer = new ConsumerBuilder<string, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build();
consumer.Subscribe(new List<string>() { "DESKTOP-DBA3LAO.dbo.CashRegister" });
var start = DateTime.Now;
long messageCounter = 0;
try
{
while (!(Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Q))
{
var result = consumer.Consume(TimeSpan.FromMilliseconds(100));
if (result == null) { continue; }
if (result.IsPartitionEOF) { break; }
++messageCounter;
if (messageCounter % 1024 == 0) { Console.WriteLine($"Received message key: \"{result.Message.Key}\" value: {result.Message.Value}"); }
}
}
catch (OperationCanceledException) { }
consumer.Close(); // commit offset and unsubscribe
var elapsed = DateTime.Now - start;
Console.WriteLine("average throughput: {0:N3} msg/sec, {1} messages over {2:N3} sec", messageCounter / elapsed.TotalSeconds, messageCounter, elapsed.TotalSeconds);
Run Code Online (Sandbox Code Playgroud)
您需要正确配置侦听器。目前,您将代理宣传为只能通过主机名访问,kafka正如 @mm8 所说,这仅在 Docker 网络中有效。
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
- KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
Run Code Online (Sandbox Code Playgroud)
您需要有一个侦听器,它将自己宣传为 localhost(或您的代码能够连接到它的任何主机名),例如:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
Run Code Online (Sandbox Code Playgroud)
现在修改您的客户端应用程序以使用localhost而不是kafka:
BootstrapServers = "localhost:9092",
Run Code Online (Sandbox Code Playgroud)
如果你仔细阅读的话,这些都包含在我的博客中:)
| 归档时间: |
|
| 查看次数: |
3226 次 |
| 最近记录: |