Svi*_*ana 4 java spring apache-kafka spring-boot spring-kafka
我正在使用 spring boot + spring @KafkaListener。我期望的行为是:我的 kafka 侦听器在 10 个线程中读取消息。这样,如果其中一个线程挂起,其他消息将继续读取和处理消息。
我定义了bean
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory)
{
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
和弹簧引导配置:
spring.kafka.listener.concurrency=10
Run Code Online (Sandbox Code Playgroud)
我看到所有配置都有效,我在 jmx 中看到了我的 10 个线程:
但后来我做了这样的测试:
@KafkaListener(topics = {
"${topic.name}" }, clientIdPrefix = "${kafka.client.id.prefix}", idIsGroup = false, id = "${kafka.listener.name}", containerFactory = "kafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record)
{
if(record.getVersion() < 3) {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else
System.out.println("It works!");
}
Run Code Online (Sandbox Code Playgroud)
如果版本 < 3,则挂起,否则 - 工作。我用版本 1,2 和 3 发送了 3 条消息。我希望版本 1 和 2 的消息会挂起,但版本 3 将在涉及侦听器时进行处理。但不幸的是,版本 3 的消息在开始处理之前等待消息 1 和 2。
也许我的期望不是真的,这是 kafka 听众的正确行为。请帮我处理kafka并发,为什么会这样?
Gar*_*ell 13
Kafka 不是这样工作的。您至少需要与消费者一样多的分区(由concurrencyspring 容器控制)。
此外,一次只有一个消费者(在一个组中)可以从一个分区中消费,因此,即使您增加了分区,“卡住”消费者后面的同一分区中的记录也不会被其他消费者收到。
| 归档时间: |
|
| 查看次数: |
14233 次 |
| 最近记录: |