如何使用spring boot设置kafka使用者并发性

use*_*182 2 java apache-kafka spring-boot kafka-consumer-api spring-kafka

我正在编写一个基于Java的Kafka Consumer应用程序.我正在为我的应用程序使用kafka-clients,Spring Kafka和Spring启动.虽然Spring启动让我轻松编写Kafka使用者(没有真正编写ConcurrentKafkaListenerContainerFactory,ConsumerFactory等),但我希望能够为这些使用者定义/定制一些属性.但是,我找不到使用Spring启动的简单方法.例如:我有兴趣设置的一些属性是 -

ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

我接过一看春天开机前定义的属性在这里.

此外,基于前面一个问题在这里,我想设置对消费者的并发性,但无法找到一个配置,为application.properties驱动的方式做到这一点使用Spring启动.

一种显而易见的方法是ConcurrentKafkaListenerContainerFactory, ConsumerFactory在我的Spring Context中再次定义类并从那里开始工作.我想了解是否有更简洁的方法,特别是因为我使用的是Spring Boot.

版本 -

  • kafka-clients - 0.10.0.0-SASL
  • spring-kafka - 1.1.0.RELEASE
  • 春季靴子 - 1.5.10.RELEASE

Gar*_*ell 5

在您引用的网址中,向下滚动到

spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
Run Code Online (Sandbox Code Playgroud)

spring-kafka - 1.1.0.RELEASE

我建议升级到至少1.3.5; 由于KIP-62,它具有更简单的线程模型.

编辑

使用Boot 2.0,您可以设置任意生产者,使用者,管理员,公共属性,如引导文档中所述.

spring.kafka.consumer.properties.heartbeat.interval.ms
Run Code Online (Sandbox Code Playgroud)

使用Boot 1.5,只有这里spring.kafka.properties描述.

这为生产者和消费者设置了属性,但是您可能会在日志中看到生成器的未使用/不支持属性的一些噪音.

或者,您可以简单地覆盖Boot的消费者工厂并根据需要添加属性...

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
    Map<String, Object> consumerProps = properties.buildConsumerProperties();
    consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
    return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}
Run Code Online (Sandbox Code Playgroud)