ale*_*oid 2 messaging apache-kafka spring-boot spring-kafka
在我的 Spring Boot/Kafka 项目中,我有以下消费者配置:
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));
factory.setConcurrency(10);
return factory;
}
@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
return factory;
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的PostConsumer
:
@Component
public class PostConsumer {
@Autowired
private PostService postService;
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
postService.sendPost(consumerRecord.value());
}
}
Run Code Online (Sandbox Code Playgroud)
和 application.properties:
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=groupname
spring.kafka.consumer.enable-auto-commit=false
kafka.topic.post.send=post.send
kafka.topic.post.sent=post.sent
kafka.topic.post.error=post.error
Run Code Online (Sandbox Code Playgroud)
如您所见,我添加了factory.setConcurrency(10); 但它不起作用。所有PostConsumer.sendPost
在同一个线程中执行的名称org.springframework.kafka.KafkaListenerEndpointContainer#1-8-C-1
我希望能够控制并发PostConsumer.sendPost
侦听器的数量以便并行工作。请告诉我如何使用 Spring Boot 和 Spring Kafka 来实现它。
Art*_*lan 10
问题在于我们使用 Apache Kafka Consumer 在 Spring Kafka 中追求的一致性。这种并发分布在提供的主题中的分区之间。如果你只有一个主题和一个分区,那么确实不会有任何并发。关键是在同一线程中消耗来自一个分区的所有记录。
文档中有关于此事的一些信息:https : //docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#_concurrentmessagelistenercontainer
例如,如果提供了 6 个 TopicPartition 并且并发为 3;每个容器将获得 2 个分区。对于5个TopicPartition,2个容器会得到2个分区,第三个会得到1个。如果并发大于TopicPartition的数量,会调低并发,让每个容器得到一个分区。
还有 JavaDocs:
/**
* The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
* Messages from within the same partition will be processed sequentially.
* @param concurrency the concurrency.
*/
public void setConcurrency(int concurrency) {
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
7996 次 |
最近记录: |