Spring Kafka MessageListenerContainer

use*_*837 8 apache-kafka kafka-consumer-api spring-kafka

我看到 spring Kafka 代码,我有一些疑问:

  1. 如果我们使用 1 个 @kafkaListener 和 2 个主题,那么 spring Kafka 将创建一个 MessageListenerContainer。如果我为每个主题使用单独的 @kafkaListener ,那么将创建 2 个 MessageListenerContainer 。

  2. MessageListenerContainer 是消费者的意思吗?

  3. 如果我在 ConcurrentKafkaListenerContainerFactory 中将并发数设置为 4,那么这意味着对于每个 kafkaListener,我使用代理打开 4 个线程?这意味着协调员将他们视为 4 个不同的消费者。

  4. 轮询如何与 kafkaListener 一起使用?它每次只从broker那里获取1个ConsumerRecord吗?

请帮忙。

Gar*_*ell 8

有两种实现MessageListenerContainer- KafkaMessageListenerContainer(KMLC) 和ConcurrentMessageListenerContainer(CMLC)。

CMLC 只是一个或多个 KMLC 的包装,KMLC 的数量由concurrency.

@KafkaListener始终使用 CMLC。

每个 KMLC 都有一个Consumer(和一个线程)。该线程不断地poll()使用指定的 s 消费者pollTimeout

主题/分区如何在 KMLC 之间分布取决于

  • 主题有多少个分区
  • 消费者的partition.assignment.strategy财产

如果您有多个主题,其分区数少于并发数,则可能需要备用分区分配器,例如循环分配器,否则您将拥有没有分配的空闲容器。

  1. 那是对的; 如果您明确希望每个主题使用不同的容器,则可以@KafkaListener在同一方法上提供多个注释。
  2. 请参阅上面我的解释。
  3. 这是正确的——这是与 Kafka 实现并发的唯一方法(无需添加非常复杂的逻辑来管理偏移量)。
  4. 每次轮询返回的记录数取决于多个消费者属性 , max.poll.records, fetch.min.bytes, fetch.max.wait.ms

  • 那是对的。如果消费者具有相同的“group.id”属性,则分区将在它们之间分配。如果他们有不同的“group.id”,他们都会收到所有记录。CLMC 的子 KMLC 都获得相同的“group.id”。是的; 您必须让分区数 >= `实例数 * 并发数` 以避免消费者闲置。 (2认同)