use*_*182 2 java apache-kafka spring-boot kafka-consumer-api spring-kafka
我正在编写一个基于Java的Kafka Consumer应用程序.我正在为我的应用程序使用kafka-clients,Spring Kafka和Spring启动.虽然Spring启动让我轻松编写Kafka使用者(没有真正编写ConcurrentKafkaListenerContainerFactory,ConsumerFactory等),但我希望能够为这些使用者定义/定制一些属性.但是,我找不到使用Spring启动的简单方法.例如:我有兴趣设置的一些属性是 -
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
我接过一看春天开机前定义的属性在这里.
此外,基于前面一个问题在这里,我想设置对消费者的并发性,但无法找到一个配置,为application.properties驱动的方式做到这一点使用Spring启动.
一种显而易见的方法是ConcurrentKafkaListenerContainerFactory, ConsumerFactory
在我的Spring Context中再次定义类并从那里开始工作.我想了解是否有更简洁的方法,特别是因为我使用的是Spring Boot.
版本 -
在您引用的网址中,向下滚动到
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
Run Code Online (Sandbox Code Playgroud)
spring-kafka - 1.1.0.RELEASE
我建议升级到至少1.3.5; 由于KIP-62,它具有更简单的线程模型.
编辑
使用Boot 2.0,您可以设置任意生产者,使用者,管理员,公共属性,如引导文档中所述.
spring.kafka.consumer.properties.heartbeat.interval.ms
Run Code Online (Sandbox Code Playgroud)
使用Boot 1.5,只有这里spring.kafka.properties
描述.
这为生产者和消费者设置了属性,但是您可能会在日志中看到生成器的未使用/不支持属性的一些噪音.
或者,您可以简单地覆盖Boot的消费者工厂并根据需要添加属性...
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumerProps = properties.buildConsumerProperties();
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6728 次 |
最近记录: |