Spring Kafka 和主题消费者的数量

ale*_*oid 2 messaging apache-kafka spring-boot spring-kafka

在我的 Spring Boot/Kafka 项目中,我有以下消费者配置:

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(kafkaProperties));
        factory.setConcurrency(10);    
        return factory;
    }

    @Bean
    public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

        return factory;
    }

}
Run Code Online (Sandbox Code Playgroud)

这是我的PostConsumer

@Component
public class PostConsumer {

    @Autowired
    private PostService postService;

    @KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
    public void sendPost(ConsumerRecord<String, Post> consumerRecord) {

        postService.sendPost(consumerRecord.value());

    }

}
Run Code Online (Sandbox Code Playgroud)

和 application.properties:

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=groupname
spring.kafka.consumer.enable-auto-commit=false
kafka.topic.post.send=post.send
kafka.topic.post.sent=post.sent
kafka.topic.post.error=post.error
Run Code Online (Sandbox Code Playgroud)

如您所见,我添加了factory.setConcurrency(10); 但它不起作用。所有PostConsumer.sendPost在同一个线程中执行的名称org.springframework.kafka.KafkaListenerEndpointContainer#1-8-C-1

我希望能够控制并发PostConsumer.sendPost侦听器的数量以便并行工作。请告诉我如何使用 Spring Boot 和 Spring Kafka 来实现它。

Art*_*lan 10

问题在于我们使用 Apache Kafka Consumer 在 Spring Kafka 中追求的一致性。这种并发分布在提供的主题中的分区之间。如果你只有一个主题和一个分区,那么确实不会有任何并发​​。关键是在同一线程中消耗来自一个分区的所有记录。

文档中有关于此事的一些信息:https : //docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#_concurrentmessagelistenercontainer

例如,如果提供了 6 个 TopicPartition 并且并发为 3;每个容器将获得 2 个分区。对于5个TopicPartition,2个容器会得到2个分区,第三个会得到1个。如果并发大于TopicPartition的数量,会调低并发,让每个容器得到一个分区。

还有 JavaDocs:

/**
 * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
 * Messages from within the same partition will be processed sequentially.
 * @param concurrency the concurrency.
 */
public void setConcurrency(int concurrency) {
Run Code Online (Sandbox Code Playgroud)

  • 您可以使用命令行工具`kafka-topics.sh` 来查询和调整主题的分区数。您还可以添加一个 `NewTopic` `@Bean`,如果 `NewTopic` 的数量大于当前的数量,则引导的自动配置的 `KafkaAdmin` 将调整分区。 (3认同)