ale*_*oid 4 java apache-kafka spring-boot spring-kafka
在我的 Spring Boot Kafka 应用程序中,我有以下使用者配置:
@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
return factory;
}
Run Code Online (Sandbox Code Playgroud)
和消费者:
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {
// do some logic
ack.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)
如果我理解正确,现在我有一个我的消费者的单一实例。我想增加后消费者的数量,假设有 5 个消费者将消费不同(不相同)的消息,${kafka.topic.post.send}以加快消息消费。
是否像添加factory.setConcurrency(5);到 my一样简单postKafkaListenerContainerFactory(),例如:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
factory.setConcurrency(5);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
还是我需要做一些额外的工作才能实现它?
这不是 Apache Kafka 的工作方式。一个想法总是在单个线程中的同一分区中处理记录。这factory.setConcurrency(5);绝对是关于您在一个主题中有多少个分区。所以,如果你只有一个,这个属性不会带来任何价值。如果主题中有 10 个分区,那么 Spring Kafka 会生成 5 个线程,每个线程将处理 2 个分区。
我会说这在参考手册中很清楚:
例如,如果提供了 6 个 TopicPartition 并且并发为 3;每个容器将获得 2 个分区。对于5个TopicPartition,2个容器会得到2个分区,第三个会得到1个。如果并发大于TopicPartition的数量,会调低并发,让每个容器得到一个分区。
因此,如果您想拥有您所描述的这种并发性,您确实必须在您的主题中创建 5 个分区。只有在此之后,您才能并行处理同一主题中的记录。
| 归档时间: |
|
| 查看次数: |
13555 次 |
| 最近记录: |