@KafkaListener 启动问题(Spring)

Enb*_*irr 3 spring kotlin spring-kafka

需要什么

我正在编写一个使用 Kafka 获取信息的应用程序(Spring + Kotlin)。如果我在声明@KafkaListener时设置autoStartup = "true"那么应用程序可以正常工作,但前提是代理可用。当代理不可用时,应用程序在启动时崩溃。这是不受欢迎的行为。应用程序必须工作并执行其他功能。

我尝试做什么

为了避免启动时应用程序崩溃,此站点上的另一个主题建议在声明@KafkaListener时设置autoStartup = "false"。它确实有助于防止启动时崩溃。但现在我无法成功手动启动KafkaListener。在其他示例中,我看到了KafkaListenerEndpointRegistry的自动连接,但是当我尝试这样做时:

@Service
class KafkaConsumer @Autowired constructor(
        private val kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
) {
Run Code Online (Sandbox Code Playgroud)

IntelliJ Idea 警告:

无法自动装配。未找到“KafkaListenerEndpointRegistry”类型的 bean。

当我尝试在不自动装配的情况下使用 KafkaListenerEndpointRegistry 并执行以下代码时:

@Service
class KafkaConsumer {
    private val logger = LoggerFactory.getLogger(this::class.java)
    private val kafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()

    @Scheduled(fixedDelay = 10000)
    fun startCpguListener(){
        val container = kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
        if (!container.isRunning)
            try {
                logger.info("Kafka Consumer is not running. Trying to start...")
                container.start()
            } catch (e: Exception){
                logger.error(e.message)
            }
    }

    @KafkaListener(
            id = "consumer1",
            topics = ["cpgdb.public.user"],
            autoStartup = "false"
    )
    private fun listen(it: ConsumerRecord<JsonNode, JsonNode>, qwe: Consumer<Any, Any>){
        val pay = it.value().get("payload")
        val after = pay.get("after")
        val id = after["id"].asInt()
        
        val receivedUser = CpguUser(
                id = id,
                name = after["name"].asText()
        ) 
        logger.info("received user with id = $id")
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

kafkaListenerEndpointRegistry.getListenerContainer("consumer1")总是返回 null。我想这是因为我没有自动连接 kafkaListenerEndpointRegistry。我该怎么做?或者,如果我的答案存在另一种解决方案,我将不胜感激任何帮助!谢谢!

有卡夫卡配置:

@Configuration
@EnableConfigurationProperties(KafkaProperties::class)
class KafkaConfiguration(private val props: KafkaProperties) {

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
        val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        factory.setMessageConverter(MessagingMessageConverter())
        factory.setStatefulRetry(true)

        val retryTemplate = RetryTemplate()
        retryTemplate.setRetryPolicy(AlwaysRetryPolicy())
        retryTemplate.setBackOffPolicy(ExponentialBackOffPolicy())
        factory.setRetryTemplate(retryTemplate)
        val handler = SeekToCurrentErrorHandler()
        handler.isAckAfterHandle = false
        factory.setErrorHandler(handler)
        factory.containerProperties.isMissingTopicsFatal = false

        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<Any, Any> {
        return DefaultKafkaConsumerFactory(consumerConfigs())
    }

    @Bean
    fun consumerConfigs(): Map<String, Any> {
        return mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to props.bootstrap.address,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf(MonitoringConsumerInterceptor::class.java),
                ConsumerConfig.CLIENT_ID_CONFIG to props.receiver.clientId,
                ConsumerConfig.GROUP_ID_CONFIG to props.receiver.groupId,
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
                ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true
        )
    }
}
Run Code Online (Sandbox Code Playgroud)
  • 春季启动版本:2.3.0
  • 弹簧卡夫卡版本:2.5.3
  • 卡夫卡客户端版本:2.5.0