Spring for Apache Kafka 中的侦听器容器是什么?

Vin*_*Vin 4 apache-kafka kafka-consumer-api spring-kafka

我知道要使方法成为 Kafka 消息侦听器的目标,我必须使用 @KafkaListener 注释标记此方法。此注释允许通过 containerFactory 元素指定 KafkaListenerContainerFactory。

下面是 Baeldung Spring Kafka 教程的一些片段。

KafkaConsumerConfig.java

private ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    ...
    return new DefaultKafkaConsumerFactory<>(props);
}

private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory(groupId));
    return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("foo");
}
Run Code Online (Sandbox Code Playgroud)

消息监听器.java

@KafkaListener(
    topics = "${message.topic.name}", 
    groupId = "foo", 
    containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group 'foo': " + message);
    ...
}
Run Code Online (Sandbox Code Playgroud)

我不明白的是为什么我们需要一个监听器容器工厂。什么是侦听器容器?当以这种方式注释方法时会发生什么?

Gar*_*ell 5

侦听器“容器”是跨多种技术(JMS、RabbitMQ、Kafka、AWS 等)的 Spring 概念。

监听器被定义为一个 POJO bean 方法并且是容器的一个属性(容器“包含”监听器)。

容器负责与代理交互以接收消息并根据侦听器类型对每条消息或一批消息调用您的侦听器方法。

这意味着您的应用程序代码不必处理与代理交互的机制,您可以只专注于您的业务逻辑。

该框架发现任何@KafkaListener方法并使用工厂为每个方法创建一个容器。

  • 不要在对旧答案的评论中提出新问题;它不能帮助人们找到答案。侦听器容器为每个“并发”启动一个线程(默认为 1);每个线程创建一个使用者并与其 API 交互以获取记录并将其传递给侦听器,一次一个或批量传递,具体取决于侦听器的类型。 (3认同)
  • 是的; 但并发性开始发挥作用;如果您的主题有多个分区,并且容器并发度 &gt; 1,您将获得可以同时调用您的侦听器及其订阅分区中的记录的消费者数量。在本例中,我们有一个 ConcurrentMessageListenerContainer 和 n 个“子”KafkaMessageListenerContainer,但它们都调用同一个侦听器实例。 (2认同)
  • 如果主题只有一个分区,就会发生这种情况 - Kafka 只允许每个分区有一个消费者,除非它们位于不同的组中。当只有一个分区时,将创建第二个消费者,但会分配 0 个分区 - 您可以在日志中看到这一点。 (2认同)