Rak*_*iya 5 java apache-kafka kafka-consumer-api
如果我kafka在线程内运行消费者而不从外部操作它,例如让消费者进入睡眠状态或唤醒他,是否有必要WakeupException正确处理?什么是处理它的好方法?
消费者正在 Web 服务上运行以不断地从队列中提取数据,并且永远不应该停止这样做。此外,该服务没有空闲或挂起状态。在 Kafka 的文档中指出只有当 kafka 消费者被另一个线程阻塞时才会抛出异常,但这永远不会发生。https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/WakeupException.html
卡夫卡版本 0.10.0.0
catch (WakeupException e) {
LOG.info("Kafka Consulmer wakeup exception");
// Ignore exception if closing
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
Run Code Online (Sandbox Code Playgroud)
问候, 拉克什
您可以在 Confluence 文档中找到一些示例,这些示例展示了如何正确处理 WakeupException [Confluence Consumer Doc]
基本上,如果您使用consumer.poll(Integer.MAX_VALUE)消费者将阻塞,直到获取消息。在这种情况下,如果您想停止消费,您可以调用consumer.wakeup()(从其他线程)并捕获异常以正确关闭。
此外,在同步提交偏移量时,调用consumer.wakeup()将抛出WakeupException.
| 归档时间: |
|
| 查看次数: |
9361 次 |
| 最近记录: |