vic*_*cky 4 java apache-kafka spring-boot spring-kafka
我已经实现了 Kafka 消费者,现在我有了一个场景。
要执行上述操作,我需要使用石英的调度作业(已编写)暂停/恢复 Kafka 使用者,该作业将数据从表 1 复制到表 2。但在此活动期间,我希望我的 Kafka 侦听器暂停,并且复制完成后,它应该恢复。
我的实现:
@KafkaListener(topicPartitions =
{ @TopicPartition(topic = "data_pipe", partitions = { "0" })})
public void listen(ConsumerRecord<String, String> cr) throws Exception {
Run Code Online (Sandbox Code Playgroud)
如果您使用 'kafkaListener annotation' 自动创建的 KafkaListenerEndpointRegistry bean,那么您可以像以下代码一样使用它:
@Component
public class KafkaManager {
private final KafkaListenerEndpointRegistry registry;
public KafkaManager(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
public void pause() {
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
}
public void resume() {
registry.getListenerContainers().forEach(MessageListenerContainer::resume);
}
}
Run Code Online (Sandbox Code Playgroud)
文件:https : //docs.spring.io/spring-kafka/reference/html/#pause-resume