cod*_*ent 7 avro apache-kafka spring-cloud-stream apache-kafka-streams confluent-schema-registry
我想自定义 Spring Cloud Stream Producers、Consumers 和 KStreams 中 Avro 模式主题的命名策略。
key.subject.name.strategy这将在 Kafka 中通过以下属性完成value.subject.name.strategy-> https://docs.confluence.io/current/schema-registry/serializer-formatter.html#subject-name-strategy
在原生 Kafka Producer 中,这是有效的:
private val producer: KafkaProducer<Int, Customer>
init {
val props = Properties()
...
props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
producer = KafkaProducer(props)
}
fun sendCustomerEvent(customer: Customer) {
val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
producer.send(record)
}
Run Code Online (Sandbox Code Playgroud)
但是我找不到如何在 Spring Cloud Stream 中执行此操作。到目前为止,我已经在制作人中尝试过了:
spring:
application:
name: spring-boot-customer-service
cloud:
stream:
kafka:
bindings:
output:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Run Code Online (Sandbox Code Playgroud)
显然,Spring Cloud 使用它自己的主题命名策略以及接口org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy和只有一个子类:DefaultSubjectNamingStrategy。
是否有声明性的配置方式,value.subject.name.strategy或者我们是否期望提供我们自己的org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy实现和属性spring.cloud.stream.schema.avro.subject-naming-strategy?
正如另一个答案中指出的那样,有一个专用属性,它允许为 Kafka 生产者spring.cloud.stream.schema.avro.subjectNamingStrategy设置不同的命名策略。
我贡献了org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy提供开箱即用的功能。
对于Kafka Streams和本机序列化/反序列化(Spring Cloud Streams 3.0.0+ 的默认行为),您必须使用 Confluence 的实现 ( io.confluent.kafka.serializers.subject.RecordNameStrategy) 和本机属性:
spring:
application:
name: shipping-service
cloud:
stream:
...
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
...
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
13508 次 |
| 最近记录: |