小编Cli*_*don的帖子

如何在属性文件中正确外部化 spring-boot kafka-streams 配置?

我正在尝试将我当前用 Java 代码编写的 spring-kafka 应用程序的配置外部化。我应该将ProducerConfigConsumerConfig值放入,还是如果我通过和spring.kafka.streams.properties提供它们,它们是否会被正确配置?spring.kafka.producerspring.kafka.consumer

到目前为止,我似乎应该将所有配置放入一个 bean 类型中KafkaStreamsConfiguration,以便配置我的 kafka-streams 应用程序。目前,我通过直接在代码中设置ProducerConfigConsumerConfig值来做到这一点。

当我外部化此配置时,似乎在文件中设置属性值ProducerConfig与spring-boot 创建的属性值无关(我通过在某处自动装配配置并查看它来确认这一点)。ConsumerConfigapplication.propertiesKafkaStreamsConfiguration

如果我改为提供ProducerConfigConsumerConfig值,spring.kafka.streams.properties它们会显示在KafkaStreamsConfiguration.

这是我的旧 Java 配置:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put("replication.factor", replicationFactor);
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams"); …
Run Code Online (Sandbox Code Playgroud)

spring-boot apache-kafka-streams spring-kafka

5
推荐指数
1
解决办法
5846
查看次数