ime*_*ehl 2 go avro apache-kafka kafka-consumer-api
我试图以avro格式消费Kafka消息,但我无法解码Go中从avro到json的消息.
我正在使用Confluent平台(3.0.1).例如,我生成avro消息,如:
kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"message1"}
{"f1":"message2"}
Run Code Online (Sandbox Code Playgroud)
现在我用go Kafka libary:sarama消费消息.纯文本消息正常工作.必须解码Avro消息.我发现了不同的库:github.com/linkedin/goavro,github.com/elodina/go-avro
但解码后我得到一个没有值的json(两个libs):
{"f1":""}
Run Code Online (Sandbox Code Playgroud)
goavro:
avroSchema := `
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
`
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
log.Fatal(err)
}
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))
Run Code Online (Sandbox Code Playgroud)
去-Avro公司:
schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
reader.SetSchema(schema)
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)
log.Println(decodedRecord.String())
Run Code Online (Sandbox Code Playgroud)
msg = sarama.ConsumerMessage
刚刚发现(通过比较二进制 avro 消息)我必须删除消息字节数组的前 5 个元素 - 现在一切正常:)
message = msg.Value[5:]
Run Code Online (Sandbox Code Playgroud)
也许有人可以解释为什么