Spring Kafka 单个主题的多个消费者消费不同的消息

ale*_*oid 4 java apache-kafka spring-boot spring-kafka

在我的 Spring Boot Kafka 应用程序中,我有以下使用者配置:

@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
    return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

    return factory;
}
Run Code Online (Sandbox Code Playgroud)

和消费者:

@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {

    // do some logic

    ack.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)

如果我理解正确,现在我有一个我的消费者的单一实例。我想增加后消费者的数量,假设有 5 个消费者将消费不同(不相同)的消息,${kafka.topic.post.send}以加快消息消费。

是否像添加factory.setConcurrency(5);到 my一样简单postKafkaListenerContainerFactory(),例如:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
    factory.setConcurrency(5);

    return factory;
}
Run Code Online (Sandbox Code Playgroud)

还是我需要做一些额外的工作才能实现它?

Art*_*lan 6

这不是 Apache Kafka 的工作方式。一个想法总是在单个线程中的同一分区中处理记录。这factory.setConcurrency(5);绝对是关于您在一个主题中有多少个分区。所以,如果你只有一个,这个属性不会带来任何价值。如果主题中有 10 个分区,那么 Spring Kafka 会生成 5 个线程,每个线程将处理 2 个分区。

我会说这在参考手册中很清楚:

例如,如果提供了 6 个 TopicPartition 并且并发为 3;每个容器将获得 2 个分区。对于5个TopicPartition,2个容器会得到2个分区,第三个会得到1个。如果并发大于TopicPartition的数量,会调低并发,让每个容器得到一个分区。

因此,如果您想拥有您所描述的这种并发性,您确实必须在您的主题中创建 5 个分区。只有在此之后,您才能并行处理同一主题中的记录。