我们如何为Spring Cloud Stream Kafka生产者、消费者和KStreams中的模式配置value.subject.name.strategy?

cod*_*ent 7 avro apache-kafka spring-cloud-stream apache-kafka-streams confluent-schema-registry

我想自定义 Spring Cloud Stream Producers、Consumers 和 KStreams 中 Avro 模式主题的命名策略。

key.subject.name.strategy这将在 Kafka 中通过以下属性完成value.subject.name.strategy-> https://docs.confluence.io/current/schema-registry/serializer-formatter.html#subject-name-strategy

在原生 Kafka Producer 中,这是有效的:


private val producer: KafkaProducer<Int, Customer>

    init {
        val props = Properties()
        ...
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }

    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }
Run Code Online (Sandbox Code Playgroud)

但是我找不到如何在 Spring Cloud Stream 中执行此操作。到目前为止,我已经在制作人中尝试过了:

spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
                value:
                  subject:
                    name:
                      strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Run Code Online (Sandbox Code Playgroud)

显然,Spring Cloud 使用它自己的主题命名策略以及接口org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy和只有一个子类:DefaultSubjectNamingStrategy

是否有声明性的配置方式,value.subject.name.strategy或者我们是否期望提供我们自己的org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy实现和属性spring.cloud.stream.schema.avro.subject-naming-strategy

cod*_*ent 5

正如另一个答案中指出的那样,有一个专用属性,它允许为 Kafka 生产者spring.cloud.stream.schema.avro.subjectNamingStrategy设置不同的命名策略。

我贡献了org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy提供开箱即用的功能。

对于Kafka Streams和本机序列化/反序列化(Spring Cloud Streams 3.0.0+ 的默认行为),您必须使用 Confluence 的实现 ( io.confluent.kafka.serializers.subject.RecordNameStrategy) 和本机属性:

spring:
  application:
    name: shipping-service
  cloud:
    stream:
      ...
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              ...
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy 
Run Code Online (Sandbox Code Playgroud)