Cli*_*don 5 spring-boot apache-kafka-streams spring-kafka
我正在尝试将我当前用 Java 代码编写的 spring-kafka 应用程序的配置外部化。我应该将ProducerConfig和ConsumerConfig值放入,还是如果我通过和spring.kafka.streams.properties提供它们,它们是否会被正确配置?spring.kafka.producerspring.kafka.consumer
到目前为止,我似乎应该将所有配置放入一个 bean 类型中KafkaStreamsConfiguration,以便配置我的 kafka-streams 应用程序。目前,我通过直接在代码中设置ProducerConfig和ConsumerConfig值来做到这一点。
当我外部化此配置时,似乎在文件中设置属性值ProducerConfig与spring-boot 创建的属性值无关(我通过在某处自动装配配置并查看它来确认这一点)。ConsumerConfigapplication.propertiesKafkaStreamsConfiguration
如果我改为提供ProducerConfig和ConsumerConfig值,spring.kafka.streams.properties它们会显示在KafkaStreamsConfiguration.
这是我的旧 Java 配置:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put("replication.factor", replicationFactor);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "600000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new KafkaStreamsConfiguration(props);
}
Run Code Online (Sandbox Code Playgroud)
这最终导致ProducerConfig和ConsumerConfig值不在KafkaStreamsConfiguration运行时:
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.consumer.group-id=<group_id> #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.producer.compression-type=lz4 #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.consumer.auto-offset-reset=latest #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor
Run Code Online (Sandbox Code Playgroud)
然而,这确实会KafkaStreamsConfiguration产生预期的值:
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.streams.properties.group-id=<group_id> #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.streams.properties.compression-type=lz4 #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.streams.properties.auto-offset-reset=latest #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor
Run Code Online (Sandbox Code Playgroud)
我期望ProducerConfig和值分别通过和设置时ConsumerConfig传播到。特别是因为我在 IntelliJ 中获得了 Intellisense,用于.KafkaStreamsConfigurationspring.kafka.producerspring.kafka.consumerapplication.properties
也就是说,我是否需要确保将这些设置完成spring.kafka.streams.properties才能正确配置应用程序?
spring.kafka.consumer.group-id=<group_id> #this won't show up in KafkaStreamsConfiguration
Streams 将 设为该group.id属性application.id。
公共静态最终字符串APPLICATION_ID_CONFIG =“application.id”;
private static Final String APPLICATION_ID_DOC = "流处理应用程序的标识符。在 Kafka 集群中必须是唯一的。它用作 1) 默认客户端 ID 前缀,2) 用于成员管理的组 ID,3) 变更日志主题前缀。";
看KafkaProperties。
streams,producer并且consumer属性是不同且不相关的。
spring.kafka.producer.compression-type=lz4 #this won't show up in KafkaStreamsConfiguration
compression-type不作为流的第一类启动属性公开。您可以使用它来设置
spring.kafka.streams.properties.compression.type=gzip
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5846 次 |
| 最近记录: |