卡夫卡 这个主题分区没有领导者,因为我们正在进行领导者选举

roa*_*art 10 javascript apache-kafka docker-compose kafkajs

刚刚开始学习Kafka。我正在尝试设置一个包含 2 个代理的小型 kafka 集群。当两个经纪人都启动时,我成功地将消息发送到我的主题。我想测试当 2 个代理之一完成时集群的行为。我使用 docker stop kafka1 停止了我的主代理(Kafka1),然后我尝试向我的集群发送一条消息,看看我的生产者是否能够理解他需要发送到 kafka2,因为 kafka1 已关闭。

但是我不断收到以下错误:

{“level”:“ERROR”,“timestamp”:“2022-07-19T18:59:46.891Z”,“logger”:“kafkajs”,“message”:“[连接]响应元数据(键:3,版本: 6)","broker":"localhost:39092","clientId":"my-app","error":"这个主题分区没有领导者,因为我们正在进行领导者选举" ,“correlationId”:1,“大小”:144}

下面是我的生产者代码:

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092', 'localhost:39092'],
})
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })

await producer.connect()

await producer.send({
  topic: 'coverageEvolved',
  messages: [
    { value: JSON.stringify(bodyActiveMq), key: bodyActiveMq[0].roamPartner},
  ],
})

await producer.disconnect()
Run Code Online (Sandbox Code Playgroud)

下面是我的 docker-compose-file:

version: '2'
services:
    zookeeper:
    image: confluentinc/cp-zookeeper:latest
    restart: unless-stopped
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    volumes:
      - ./zookeeper/data:/var/lib/zookeeper/data
    kafka-1:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
            - 29092:29092
        restart: unless-stopped
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka- 
    1:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        volumes:
          - ./kafka1/data:/var/lib/kafka/data
      kafka-2:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - 39092:39092
        restart: unless-stopped
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        volumes:
          - ./kafka2/data:/var/lib/kafka/data
Run Code Online (Sandbox Code Playgroud)

cri*_*007 5

如果您没有以其他方式创建主题,Kafka 将默认创建coverageEvolved仅包含一个副本和一个分区的代码中使用的主题。

如果您终止托管该副本的代理,则将不会生成可同步的副本领导者。

您可以使用Kafkajs创建主题

另外值得一提的是,有一个事务主题只有一个副本(您缺少它的环境变量)。这主要只与 Java 客户端相关,因为从 Kafka 3.0 开始默认启用事务生产者