roa*_*art 10 javascript apache-kafka docker-compose kafkajs
刚刚开始学习Kafka。我正在尝试设置一个包含 2 个代理的小型 kafka 集群。当两个经纪人都启动时,我成功地将消息发送到我的主题。我想测试当 2 个代理之一完成时集群的行为。我使用 docker stop kafka1 停止了我的主代理(Kafka1),然后我尝试向我的集群发送一条消息,看看我的生产者是否能够理解他需要发送到 kafka2,因为 kafka1 已关闭。
但是我不断收到以下错误:
{“level”:“ERROR”,“timestamp”:“2022-07-19T18:59:46.891Z”,“logger”:“kafkajs”,“message”:“[连接]响应元数据(键:3,版本: 6)","broker":"localhost:39092","clientId":"my-app","error":"这个主题分区没有领导者,因为我们正在进行领导者选举" ,“correlationId”:1,“大小”:144}
下面是我的生产者代码:
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092', 'localhost:39092'],
})
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })
await producer.connect()
await producer.send({
topic: 'coverageEvolved',
messages: [
{ value: JSON.stringify(bodyActiveMq), key: bodyActiveMq[0].roamPartner},
],
})
await producer.disconnect()
Run Code Online (Sandbox Code Playgroud)
下面是我的 docker-compose-file:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
restart: unless-stopped
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
volumes:
- ./zookeeper/data:/var/lib/zookeeper/data
kafka-1:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
restart: unless-stopped
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-
1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
volumes:
- ./kafka1/data:/var/lib/kafka/data
kafka-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 39092:39092
restart: unless-stopped
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
volumes:
- ./kafka2/data:/var/lib/kafka/data
Run Code Online (Sandbox Code Playgroud)
如果您没有以其他方式创建主题,Kafka 将默认创建coverageEvolved仅包含一个副本和一个分区的代码中使用的主题。
如果您终止托管该副本的代理,则将不会生成可同步的副本领导者。
您可以使用Kafkajs创建主题。
另外值得一提的是,有一个事务主题只有一个副本(您缺少它的环境变量)。这主要只与 Java 客户端相关,因为从 Kafka 3.0 开始默认启用事务生产者
| 归档时间: |
|
| 查看次数: |
15320 次 |
| 最近记录: |