SpringBoot Kafka Consumer:无法启动bean内部KafkaListenerEndpointRegistry TimeoutException

Ora*_*nge 3 java apache-kafka spring-boot kafka-consumer-api spring-kafka

我有一个使用 kafka 的 spring boot 应用程序,但我无法启动它,因为我刚刚实现了一个正在监听离线服务器的 kafka 消费者。当我开始时我得到一些:

org.springframework.context.ApplicationContextException:无法启动bean“org.springframework.kafka.config.internalKafkaListenerEndpointRegistry”;嵌套异常是 org.apache.kafka.common.errors.TimeoutException:获取主题元数据时超时已过期

因为卡夫卡宕机了。

如何配置 spring boot 应用程序,即使 kafka 服务器关闭也能启动?

下面是我的 Kafka Consumer 配置:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${app.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${HOSTNAME:NO_HOSTNAME}")
    private String groupId;

    @Value(value = "${spring.profiles.active}")
    private String activeSpringProfile;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,String.format("RANDOM_GROUP_ID_%s_%s", groupId, RandomUtils.nextInt()));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
Run Code Online (Sandbox Code Playgroud)

谢谢你!

Gar*_*ell 5

在最新版本中,missingTopicsFatal容器属性为true,这就是导致此问题的原因。你可以把它关掉...

@Component
class ContainerFactoryConfigurer {

    ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
        factory.getContainerProperties().setMissingTopicsFatal(false);
    }

}
Run Code Online (Sandbox Code Playgroud)