带解码器问题的Kafka Avro Consumer

Spa*_*oat 8 java avro apache-kafka kafka-consumer-api apache-nifi

当我尝试使用我的相应模式使用Avro运行Kafka Consumer时,它返回错误"AvroRuntimeException:格式错误的数据.长度为负:-40".我看到其他人有类似的问题,将字节数组转换为json,Avro写入和读取,以及Kafka Avro Binary*编码器.我也引用了这个消费者组示例,它们都很有帮助,但到目前为止这个错误没有任何帮助..它可以工作到这部分代码(第73行)

解码器解码器= DecoderFactory.get().binaryDecoder(byteArrayInputStream,null);

我已经尝试了其他解码器并打印出byteArrayInputStream变量的内容,它看起来我相信你会期望序列化的avro数据看起来(在消息中我可以看到模式和一些数据以及一些格式错误的数据)我打印出来了使用.available()方法可用的字节,返回594.我无法理解为什么会发生此错误.Apache Nifi用于生成具有来自hdfs的相同模式的Kafka流.我将不胜感激任何帮助.

Mic*_*oll 17

也许问题是Nifi如何编写(编码)Avro数据与消费者应用程序读取(解码)数据的方式不匹配.

简而言之,Avro的API提供了两种不同的序列化方法:

  1. 用于创建正确的Avro 文件:编码数据记录,但也将Avro架构嵌入到一种前导码(via org.apache.avro.file.{DataFileWriter/DataFileReader})中.嵌入架构成的Avro文件使得很多的意义,因为(一)一般的的Avro公司文件"有效载荷"的幅度比嵌入式的Avro模式和(b)您可以再复制或在你的心脏的内容中移动这些文件更大的订单并且仍然可以确保您可以再次阅读它们,而无需咨询某人或某事.
  2. 仅编码数据记录,即不嵌入模式(通过org.apache.avro.io.{BinaryEncoder/BinaryDecoder};注意包名称的差异:io此处与file上述相比).编码的Avro-时被写入到卡夫卡的话题,例如邮件,因为相对于变体1上面,您不会再嵌入的Avro模式的开销到每一个消息,假设这种方法通常有利于您的(非常合理)策略是,对于相同的Kafka主题,消息使用相同的Avro架构进行格式化/编码.这是一个显着的优点,因为在流数据上下文中,动态数据记录通常比如上所述的静态数据Avro文件小得多(通常在100字节到几百KB之间)(通常是数百或者数千MB); 所以Avro架构的大小相对较大,因此在将2000个数据记录写入Kafka时,您不希望将其嵌入2000x.其缺点是,你必须"以某种方式"追踪模式Avro公司如何映射到卡夫卡的主题-或者更准确地说,你必须以某种方式跟踪与Avro的模式的消息而不下降直接嵌入模式的路径进行编码.好消息是Kafka生态系统(Avro架构注册表)中有工具可用于透明地执行此操作.因此,与变体1相比,变体2以便利性为代价获得了效率.

结果是,编码的Avro数据的"有线格式"看起来会有所不同,具体取决于您使用上面的(1)还是(2).

我不是很熟悉Apache Nifi,但快速查看源代码(例如ConvertAvroToJSON.java)建议,我认为它是使用变体1,即它嵌入Avro的模式旁边的Avro的记录.但是,您的使用者代码使用了DecoderFactory.get().binaryDecoder()变体2(没有嵌入模式).

也许这解释了你遇到的错误?

  • 谢谢@miguno,就是这样!我正在使用解码器摇摆和滚动到 DataFileReader,并进行两行更改。DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema); DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(inputStream, datumReader); (2认同)