小编S. *_*ood的帖子

无法使用 KafkaNull 作为值发送消息

我正在使用主题上的日志压缩构建 Kafka 应用程序,但我无法发送墓碑值 (KafkaNull)

我曾尝试使用序列化程序的默认配置,当它不起作用时,我使用了“使用原始标头发布 null/tombstone 消息”中的建议更改将 application.properties 设置为:

spring.cloud.stream.output.producer.useNativeEncoding=true
spring.cloud.stream.kafka.binder.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer
Run Code Online (Sandbox Code Playgroud)

我必须向流发送消息的代码是

this.stockTopics.compactedStocks().send(MessageBuilder
    .withPayload(KafkaNull.INSTANCE)
    .setHeader(KafkaHeaders.MESSAGE_KEY,company.getBytes())
    .build())
Run Code Online (Sandbox Code Playgroud)

this.stopTopics.compactedStocks() 返回一个我可以向其发送消息的 messageStream。

每次我尝试使用 KafkaNull 实例作为有效负载发送该消息时,我都会收到错误消息 Failed to convert message: 'GenericMessage [payload=org.springframework.kafka.support.KafkaNull@1c2d8163, headers={id=f81857e7-fbd0-56f5-8418-6a1944e7f2b1, kafka_messageKey=[B@36ec022a, contentType=application/json, timestamp=1547827957485}]' to outbound message.

我希望消息简单地以空值发送给消费者,但显然它是错误的。

apache-kafka spring-cloud-stream

2
推荐指数
1
解决办法
1155
查看次数

标签 统计

apache-kafka ×1

spring-cloud-stream ×1