Spring Cloud Stream with Avro 无法正确转换字符串消息

ccs*_*hih 0 spring-cloud-stream

我遇到一个问题,源发送 GenericMessage [payload=xxxxx, ...],而接收器接收消息为 10,120,120,120,120,120。

这个问题是在我设置 Avro 消息转换器后发生的。如果我删除 Avro 消息转换器并使用 StreamListener 来处理消息转换,它将正常工作。

源应用程序.properties

spring.cloud.stream.bindings.toGreeting.destination=greeting
spring.cloud.stream.bindings.toGreeting.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
Run Code Online (Sandbox Code Playgroud)

水槽应用

server.port=8990 
spring.cloud.stream.bindings.greeting.destination=greeting
Run Code Online (Sandbox Code Playgroud)

消息转换器

@Configuration
@EnableSchemaRegistryClient
public class MessageConverterConfig {
    @Bean
    public MessageConverter topic1MessageConverter() throws IOException {
        return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
    }
}
Run Code Online (Sandbox Code Playgroud)

应用类

@SpringBootApplication
@EnableSchemaRegistryClient
public class SourceApplication {
    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }
}

@EnableSchemaRegistryServer
@EnableSchemaRegistryClient
@SpringBootApplication
public class SinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

我缺少配置吗?谢谢。

Vin*_*lho 5

这是一个简单的规则:

如果您只想拥有一个可以从 avro 序列化/反序列化的消息转换器,并且您可以在 GenericRecords 配置期间提供架构位置,或者您的 StreamListener 方法具有 SpecificRecord 类型的签名。然后选择AvroSchemaMessageConverter,像您一样进行设置,但avro/bytes改为使用。我们保留application/*+avro对模式演化的支持。

因此,如果您进行设置,@EnableSchemaRegistryClient那么您将委托给外部注册表来拥有您的架构。在这种情况下,您不仅需要注册表,还需要在那里注册的模式。

默认情况下,生产者将自动注册 SpecificRecord/GenericRecord 或 Pojos 类型的任何有效负载(如果spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled启用)。

在这种情况下,制作者实际上会将标头设置为假设application/vnd.user.v1+avro您的主题是用户并且它是第一个版本。

在下游,如果您的消费者也配置了application/*+avrocontentType,他们将能够读取此 contentType 并推断其主题/版本,以查询架构服务器并检索适当的架构。