Kafka 消费者唤醒异常处理 Java

Rak*_*iya 5 java apache-kafka kafka-consumer-api

如果我kafka在线程内运行消费者而不从外部操作它,例如让消费者进入睡眠状态或唤醒他,是否有必要WakeupException正确处理?什么是处理它的好方法?

消费者正在 Web 服务上运行以不断地从队列中提取数据,并且永远不应该停止这样做。此外,该服务没有空闲或挂起状态。在 Kafka 的文档中指出只有当 kafka 消费者被另一个线程阻塞时才会抛出异常,但这永远不会发生。https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/WakeupException.html

卡夫卡版本 0.10.0.0

catch (WakeupException e) {
    LOG.info("Kafka Consulmer wakeup exception");
    // Ignore exception if closing
    if (!closed.get()) {
        throw e;
    }
} finally {
    consumer.close();
}
Run Code Online (Sandbox Code Playgroud)

问候, 拉克什

fhu*_*ois 8

您可以在 Confluence 文档中找到一些示例,这些示例展示了如何正确处理 WakeupException [Confluence Consumer Doc]

基本上,如果您使用consumer.poll(Integer.MAX_VALUE)消费者将阻塞,直到获取消息。在这种情况下,如果您想停止消费,您可以调用consumer.wakeup()(从其他线程)并捕获异常以正确关闭。

此外,在同步提交偏移量时,调用consumer.wakeup()将抛出WakeupException.