关闭卡夫卡消费者

Pun*_*cky 2 java spring apache-kafka

我在此链接中阅读了Kafka高级消费者的详细信息,并看到以下声明-

实际上,更常见的模式是无限期地使用睡眠并使用关闭钩子来触发干净关闭。

是否有任何执行此操作的示例或有帮助的指针?

Nau*_*lus 5

这将是一个无限循环的例子

public void run() {
    try {
      consumer.subscribe(topics);
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        //do something
      }
    } catch (WakeupException e) {
      // do nothing we are shutting down 
    } finally {
      consumer.close();
    }
  }

  public void shutdown() {
    consumer.wakeup();
  }
}
Run Code Online (Sandbox Code Playgroud)

这将是您的关机钩子。

@PostConstruct
    private void init(){
        addShutdownHook(); 
    }

 private void addShutdownHook(){
   Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

            @Override
            public void run() {
                shutdown();
            }
        }));
    }
Run Code Online (Sandbox Code Playgroud)