如何优雅地关闭正在运行的 Kafka Consumer

Ron*_*ani 3 apache-kafka kafka-consumer-api spring-kafka

我需要根据某些数据库驱动的属性打开/关闭 Kafka 消费者。怎样才能实现呢。

我想到的一种方法是:当消费者标志关闭时,从消费者抛出异常。容器工厂配置定义为

factory.setErrorHandler(new SeekToCurrentErrorHandler());
Run Code Online (Sandbox Code Playgroud)

但它积极寻求同样的信息。

有什么办法可以根据需要关闭心跳然后再打开。

Gar*_*ell 6

您可以stop()start()侦听器容器一起使用。

看来您正在使用,@KafkaListener因为您正在使用容器工厂。

在这种情况下

@KafkaListener(id = "foo" ...)
Run Code Online (Sandbox Code Playgroud)

然后使用KafkaListenerEndpointRegistry豆...

registry.getListenerContainer("foo").stop();
Run Code Online (Sandbox Code Playgroud)