如何手动启动Kafka监听器?

mic*_*obg 3 java spring apache-kafka spring-boot spring-kafka

我有一些方法注释,@KafkaListener但我只想手动启动其中的一些(取决于某些条件)。

@KafkaListener(id = "consumer1", topics = "topic-name", clientIdPrefix = "client-prefix", autoStartup = "false")
public void consumer1(String message) {
    // consume
}
Run Code Online (Sandbox Code Playgroud)
@PostConstruct
private void startConsumers() {
    if (true) {
        kafkaListenerEndpointRegistry.getListenerContainer("consumer1").start();
    }
}
Run Code Online (Sandbox Code Playgroud)

但此时kafkaListenerEndpointRegistry.getListenerContainers()是空列表并kafkaListenerEndpointRegistry.getListenerContainer("consumer1")返回null。所以也许@PostConstruct调用方法的时间太早了,监听器还没有注册。我尝试使用注释startConsumers()方法,@Scheduled(fixedDelay = 100)并且侦听器已经可用。但是@Scheduled对于我想在启动应用程序后调用一次的东西,使用并不是一个好的决定。

Gar*_*ell 7

你不能这样做@PostConstruct- 在应用程序上下文生命周期中还为时过早。

实现SmartLifecyle将阶段设置为Integer.MAX_VALUE并在start()方法中启动容器。

或者使用@EventListenerand 监听ApplicationStartedEvent(如果使用 Spring Boot)ContextRefreshedEvent或非 Boot Spring 应用程序。

  • ```@EventListener public void onAppStarted(ApplicationStartedEvent event) {}``` 正是我正在寻找的。谢谢! (4认同)