如何根据启动标志启动@KafkaListener

Bee*_*eez 4 java apache-kafka

仅当标志设置为 true 时,我才尝试启动 KafkaListener。

@Component
public class KafkaTopicConsumer {

//Somehow wrap the listener to only start when a property value is set to true

@KafkaListener(topics = "#{@consumerTopic}", groupId = "#{@groupName}")
public void consumeMessage(ConsumerRecord<String, String> message) throws IOException {
    logger.info("Consumed message from topic: {} with message: {}", message.topic(), message);
}
Run Code Online (Sandbox Code Playgroud)

有没有办法仅确保当 start.consumer 属性等属性设置为 true 时启动侦听器?我不希望只有在我指定要启动应用程序时才启动侦听器。有没有一个好的方法来处理这个用例?

rph*_*rph 9

首先,您需要设置autoStartup容器false并为其命名。然后您需要根据使用的标志手动启动它@EventListener

@Component
public class KafkaTopicConsumer {
    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Value("${start.consumer}")
    private boolean shouldStart;

    @KafkaListener(id = "myListener", autoStartup = "false", topics = "#{@consumerTopic}", groupId = "#{@groupName}")
    public void consumeMessage(ConsumerRecord<String, String> message) throws IOException {
        logger.info("Consumed message from topic: {} with message: {}", message.topic(), message);
    }

    @EventListener
    public void onStarted(ApplicationStartedEvent event) {
        if (shouldStart) {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("myListener");
            listenerContainer.start();    
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

注意: @EventListener将确保容器正确装载,如果使用@PostConstruct它可能无法工作。

编辑

使用注释添加了属性的实际读取@Value

注意:此方法具有额外的灵活性,只需进行一些更改即可动态调用start和方法(例如使用 JMX)。stop这有利于我们想要禁用消费者并稍后启用它而无需重新启动应用程序的场景。

正如@Makoton 的回答中正确所述,另一个好方法是使用@ConditionalOnProperty. 请注意,在您的示例中,您可以使用它而@Component不是@Bean手动定义。

@Component
@ConditionalOnProperty(
        value = "start.consumer",
        havingValue = "true")
public class KafkaTopicConsumer { // ...
Run Code Online (Sandbox Code Playgroud)

这一切都取决于您所需的灵活性水平。


小智 6

您可以将 ConditionalsBeans 与属性一起使用

@Bean
@ConditionalOnProperty(
  value="my.custom.flag", 
  havingValue = "true")
public KafkaListener kafkaListener{
 .....
}
Run Code Online (Sandbox Code Playgroud)

条件 bean 允许您根据属性或自定义条件启动 bean。 参考