Spring Kafka 轮询 @KafkaListener 和侦听器确认模式设置为记录

use*_*206 4 apache-kafka kafka-consumer-api spring-kafka

我使用 @KafkaListener 和 ConcurrentKafkaListenerContainerFactory 来监听 3 个 kafka 主题,每个主题有 10 个分区。我对它是如何工作的有几个问题。

    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(30);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
    @KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
Run Code Online (Sandbox Code Playgroud)

我的listener.ackmode返回,enable.auto.commit设置为false,partition.assignment.strategy:org.apache.kafka.clients.consumer.RoundRobinAssignor

1)我对并发的理解是,因为我将并发(在工厂级别)设置为30,并且我总共有30个分区(所有三个主题一起)可供读取,所以每个线程将被分配一个分区。我的理解正确吗?如果我在 @KafkaListener 注释中再次覆盖并发性,会产生什么影响?

2)当spring调用poll()方法时,它是否从所有三个主题进行轮询?

3)由于我将listener.ackmode设置为返回,它会等到单个poll()中返回的所有记录完成后再发出下一个poll()吗?另外,如果我的记录处理时间超过 max.poll.interval.ms 会发生什么?假设在一次 poll() 调用中返回 1-100 个偏移量,而我的代码只能在达到 max.poll.interval.ms 之前处理 50 个偏移量,此时 spring 会发出另一个轮询,因为它已经达到了 max.poll .间隔.ms?如果是这样,下一个 poll() 是否会从偏移量 51 返回记录?

非常感谢您的时间和帮助

Gar*_*ell 6

我的listener.ackmode是返回

没有这样的ackmode;由于您没有在工厂中设置它,因此您的实际确认模式是 BATCH (默认)。要使用 ack 模式记录(如果这就是您的意思),您必须配置工厂容器属性。

我对并发的理解是......

你的理解不正确;并发数不能大于分区数最多的主题中的分区数(如果一个监听器监听多个主题)。由于每个主题中只有 10 个分区,因此实际并发数为 10。

覆盖concurrency侦听器上的 只会覆盖出厂设置;您始终需要至少与并发数一样多的分区。

当spring调用poll()方法时,它会轮询所有三个主题吗?

不具备该配置;您有 3 个并发容器,每个容器有 30 个消费者收听一个主题。您有 90 个消费者。

如果您对所有 3 个主题都有一个侦听器,则轮询将返回所有 3 个主题的记录;但您仍然可能有 20 个空闲消费者,具体取决于分区分配器如何分配分区 - 请参阅日志“分区分配”以了解分区的具体分配方式。循环分配器应该可以正确分配它们。

spring 此时会发布另一项民意调查

Spring 无法控制 - 如果您花费的时间太长,Consumer 线程位于侦听器中 - Consumer 不是线程安全的,因此我们无法发出异步轮询。

必须max.poll.records在内部进行处理max.poll.interval.ms,以避免 Kafka 重新平衡分区。

ack 模式没有区别;这一切都是为了及时处理民意调查结果。