Geo*_*rge 9 apache-kafka kafka-consumer-api spring-kafka
有什么不同?KafkaConsumer 和 KafkaListener 可以互换使用吗?
Art*_*lan 13
该@KafkaListener是对高层次的API ConcurrentMessageListenerContainer,其周围滋生几个内部的听众KafkaConsumer。
不同之处在于,当您需要时调用该KafkaConsumerAPI时,该API 可按需轮询poll()。侦听器抽象将围绕poll()它进行无限循环,并且每当记录从poll(). 我们有一个任务执行器,它运行这样的逻辑:
while (isRunning()) {
try {
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// Ignore, we're stopping
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
break;
}
catch (Exception e) {
handleConsumerException(e);
}
catch (Error e) { // NOSONAR - rethrown
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
if (runnable != null) {
runnable.run();
}
this.logger.error("Stopping container due to an Error", e);
wrapUp();
throw e;
}
}
Run Code Online (Sandbox Code Playgroud)
在KafkaConsumer.poll()被称为在pollAndInvoke();。
| 归档时间: |
|
| 查看次数: |
5741 次 |
| 最近记录: |