在 Spring Boot 中控制启用/禁用 Kafka 消费者

Hua*_*Hua 5 configuration spring-boot kafka-consumer-api

我在 Spring Boot 中配置了几个 Kafka 消费者。这就是 kafka.properties 的样子(这里只列出一个消费者的配置):

kafka.topics=
bootstrap.servers=
group.id=
enable.auto.commit=
auto.commit.interval.ms=
session.timeout.ms=
schema.registry.url=
auto.offset.reset=
kafka.enabled=
Run Code Online (Sandbox Code Playgroud)

这是配置:

@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConsumerFactory<String, String> pindropConsumerFactory() {
        Map<String, Object> dataRiverProps = new HashMap<>();

        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));

        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

        return new DefaultKafkaConsumerFactory<>(dataRiverProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(pindropConsumerFactory());
        return factory;
    }
}
Run Code Online (Sandbox Code Playgroud)

这是消费者:

@Component
public class KafkaConsumer {

    @Autowired
    private MessageProcessor messageProcessor;


    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void consumeJson(String message) {
        // processing message
    }
}
Run Code Online (Sandbox Code Playgroud)

有没有办法让我使用道具“kafka.enabled”,以便我可以控制此消费者的创建或消息检索?非常感谢!

sam*_*una 19

您可以通过在消费者中使用属性autoStartup (true/false)来做到这一点,如下所示 -

@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",
        containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")
public void consume(String message) {
    //System.out.println("Consumed message: " + message);
}
Run Code Online (Sandbox Code Playgroud)

  • `autoStartup` 从 `spring-kafka:2.2` 开始可用 (4认同)
  • 我个人正在寻找如何在运行时对已启用的 KafkaListener 执行此操作,并发现这很有帮助:https://github.com/spring-projects/spring-kafka/issues/938 (4认同)
  • @Bean public ConcurrentKafkaListenerContainerFactory&lt;String,EventMessage&gt; kafkaListener(){ ConcurrentKafkaListenerContainerFactory&lt;String,EventMessage&gt; 工厂 = new ConcurrentKafkaListenerContainerFactory&lt;String,EventMessage&gt;(); 工厂.setConsumerFactory(consumerFactory()); 工厂.setAutoStartup(假); 返回工厂;} (2认同)

Ole*_*kyi 5

例如,要禁用 Kafka 配置,您可以:

  1. 使用 KafkaConsumerConfig 注释

    @ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)

  2. @ComponentKafkaConsumer类上删除并将其定义为 @Bean in KafkaConsumerConfig

在 KafkaConsumer 中控制消息检索:

  1. 只需在 KafkaConsumer 中获取属性值 @Value("kafka.enabled") private Boolean enabled;

  2. 然后在方法中使用简单的 if 注释@KafkaListener