我正在编写一个使用 Kafka 获取信息的应用程序(Spring + Kotlin)。如果我在声明@KafkaListener时设置autoStartup = "true"那么应用程序可以正常工作,但前提是代理可用。当代理不可用时,应用程序在启动时崩溃。这是不受欢迎的行为。应用程序必须工作并执行其他功能。
为了避免启动时应用程序崩溃,此站点上的另一个主题建议在声明@KafkaListener时设置autoStartup = "false"。它确实有助于防止启动时崩溃。但现在我无法成功手动启动KafkaListener。在其他示例中,我看到了KafkaListenerEndpointRegistry的自动连接,但是当我尝试这样做时:
@Service
class KafkaConsumer @Autowired constructor(
private val kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
) {
Run Code Online (Sandbox Code Playgroud)
IntelliJ Idea 警告:
无法自动装配。未找到“KafkaListenerEndpointRegistry”类型的 bean。
当我尝试在不自动装配的情况下使用 KafkaListenerEndpointRegistry 并执行以下代码时:
@Service
class KafkaConsumer {
private val logger = LoggerFactory.getLogger(this::class.java)
private val kafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()
@Scheduled(fixedDelay = 10000)
fun startCpguListener(){
val container = kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
if (!container.isRunning)
try {
logger.info("Kafka Consumer is not running. Trying to start...")
container.start()
} catch (e: Exception){ …
Run Code Online (Sandbox Code Playgroud)