Spring 和 Kafka - 具有相同组 ID 的多个消费者 - 只有一个消费者获取消息

Vit*_*lar 2 apache-kafka spring-boot spring-kafka

我正在使用 Spring Boot 开始使用 Apache Kafka。我想实现以下事实:同一服务的两个实例以循环方式消耗同一主题的消息。

因此,实例 1 接收主题的第一条消息,实例 2 接收第二条消息,实例 1 接收第三条消息,依此类推。

这是我当前的配置:

spring:
  kafka:
    consumer:
      bootstrap-servers: rommel.local:9092
      group-id: core
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
    producer:
      bootstrap-servers: rommel.local:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      missing-topics-fatal: false
      ack-mode: record
Run Code Online (Sandbox Code Playgroud)

这是我的听众:

@KafkaListener(topics = "new-topic", groupId = "core")
    public void consume(String payload) {
        System.out.println("Data received: " + payload);
    }
Run Code Online (Sandbox Code Playgroud)

问题是,当我运行应用程序的两个实例时,只有一个实例收到该主题的消息。

我在日志中意识到了这一点:

Instance 1:
core: partitions assigned: [new-topic-0]
Run Code Online (Sandbox Code Playgroud)
Instance 2:
core: partitions assigned: []
Run Code Online (Sandbox Code Playgroud)

那么当我杀死实例1时。被new-topic-0分配给实例 2。

那么如何实现两个实例循环获取同一主题消息的场景呢?

谢谢!!

Sap*_*asu 8

首先,让我们弄清楚一些事情:

  1. 在Kafka中,并行的基本单位是分区。如果您的主题只有一个分区,则无论您配置多少个消费者实例/线程,只有其中一个会收到消息 - 其他实例将保持空闲状态。如果当前处理消息的实例出现故障,则分区将通过称为“重新平衡”的机制分配给其他消费者实例/线程,然后该实例将开始处理消息。这就是您在这里观察到的行为。

  2. 严格来说,即使有多个分区,也不会是循环。分区将均匀分布在消费者线程/进程之间。因此,每个消费者实例/线程都会有一个或多个专门分配给它的分区,并且它只会处理这些分区中的消息。

  3. 当您向主题生成消息时,消息将到达的位置取决于您将使用的分区键。这意味着具有相同分区键的消息将始终位于同一分区中(除非您在一段时间后增加分区数量)。这也意味着消息的顺序将在给定分区内保持,而不是跨分区。

  4. 在旧版本的 Kafka 中,如果在生成消息时没有分区键,则消息可能会跨分区以循环方式生成。然而,在最新版本的 Kafka 中,即使没有分区键,消息也会根据生产者配置的参数进行批处理batch.size,然后linger.ms再发送出去。