Spring Boot Kafka 监听器与消费者

Geo*_*rge 9 apache-kafka kafka-consumer-api spring-kafka

有什么不同?KafkaConsumer 和 KafkaListener 可以互换使用吗?

Art*_*lan 13

@KafkaListener是对高层次的API ConcurrentMessageListenerContainer,其周围滋生几个内部的听众KafkaConsumer

不同之处在于,当您需要时调用该KafkaConsumerAPI时,该API 可按需轮询poll()。侦听器抽象将围绕poll()它进行无限循环,并且每当记录从poll(). 我们有一个任务执行器,它运行这样的逻辑:

        while (isRunning()) {
            try {
                pollAndInvoke();
            }
            catch (@SuppressWarnings(UNUSED) WakeupException e) {
                // Ignore, we're stopping
            }
            catch (NoOffsetForPartitionException nofpe) {
                this.fatalError = true;
                ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
                break;
            }
            catch (Exception e) {
                handleConsumerException(e);
            }
            catch (Error e) { // NOSONAR - rethrown
                Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
                if (runnable != null) {
                    runnable.run();
                }
                this.logger.error("Stopping container due to an Error", e);
                wrapUp();
                throw e;
            }
        }
Run Code Online (Sandbox Code Playgroud)

KafkaConsumer.poll()被称为在pollAndInvoke();

  • 当我将属性 `spring.kafka.listener.concurrency` 设置为 4 时,会实例化 4 个消费者,还是 4 个侦听器? (2认同)