对于使用通用json消息的Spring云流kafka绑定器是否有一个很好的示例

sw-*_*ast 1 java spring apache-kafka spring-cloud-stream

我是Spring cloud Stream和Kafka的新手,我正在寻找一个从kafka主题中消费json消息的好例子.

谢谢

Jav*_*inz 6

您可以参考spring-cloud-stream团队中示例(接收器和源项目)

要让应用程序运行,有三种情况.

如果消费者和生产者是Spring-Cloud-Stream(SCS)应用程序,您只需要设置content-typeapplication/json.

#Specific channel
spring.cloud.stream.bindings.<channelName>.consumer.contentType=application/json
#For all channels
spring.cloud.stream.default.contentType=application/json
Run Code Online (Sandbox Code Playgroud)

第二种情况是你的制片人是不是SCS和你的消费者是一个SCS,SCS默认添加嵌入在净荷头,所以你需要禁用该行为将headerMode作为raw与一起contentType.

#Specific channel
spring.cloud.stream.bindings.<channelName>.consumer.headerMode=raw
#For all channels
spring.cloud.stream.default.consumer.headerMode=raw
Run Code Online (Sandbox Code Playgroud)

在第三种情况下是SCS生产者而不是SCS使用者,在这种情况下,您需要使用application/octet-streamas作为contentType因为SCS不支持String的原始头(存在问题),因此您需要将有效负载作为字节发送

#Properties
spring.cloud.stream.default.contentType=application/octet-stream
spring.cloud.stream.default.producer.headerMode=raw

//Java
byte[] payload = jacksonObjectMapper.writeValueAsBytes(entity);
return channel.send(MessageBuilder.withPayload(payload).build());
Run Code Online (Sandbox Code Playgroud)