Python AVRO阅读器在解码kafka消息时返回AssertionError

Mat*_*rna 6 python avro kafka-python

新手与Kafka和AVRO一起玩。我正在尝试使用kafka-pythonavro-python3软件包和遵循此答案来反序列化Python 3.7.3中的AVRO消息。

负责解码Kafka消息的功能是

def decode_message(msg_value, reader):
    from io import BytesIO
    from avro.io import BinaryDecoder

    message_bytes = BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)

return event_dict
Run Code Online (Sandbox Code Playgroud)

在哪里reader定义为avro.io.DatumReader实例:

def create_reader(filename_path):
    from avro.io import DatumReader
    import avro.schema

    schema = avro.schema.Parse(open(filename_path).read())
    reader = DatumReader(schema)

    return reader
Run Code Online (Sandbox Code Playgroud)

不幸的是,它失败了。这是回溯:

<_io.BytesIO object at 0x7fab73fe5530>
<avro.io.BinaryDecoder object at 0x7fab74300090>
Traceback (most recent call last):
File "app.py", line 19, in <module>
  kfk.read_messages(kafka_consumer, avro_reader)
File "/app/modules/consume_kafka.py", line 17, in read_messages
  decoded_message = decode_message(msg_value, reader)
File "/app/modules/consume_kafka.py", line 50, in decode_message
  event_dict = reader.read(decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 489, in read
  return self.read_data(self.writer_schema, self.reader_schema, decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 534, in read_data
  return self.read_record(writer_schema, reader_schema, decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 734, in read_record
  field_val = self.read_data(field.type, readers_field.type, decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 512, in read_data
  return decoder.read_utf8()
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 257, in read_utf8
  input_bytes = self.read_bytes()
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 249, in read_bytes
  assert (nbytes >= 0), nbytes
AssertionError: -40
Run Code Online (Sandbox Code Playgroud)

我能够阅读消息,看起来像

b'Obj\x01\x04\x14avro.codec\x08null\x16avro.schema\xbe\t{"type":"record","name":"tracks","namespace":"integration","fields":[{"name":"name","type":"string"},{"name":"data","type":[{"type":"record","name":"track_upload_verified","namespace":"integration.tracks","fields":[{"name":"track_id","type":"string"},{"name":"audio_filename","type":"string"},{"name":"track_type","type":"string"}]},{"type":"record","name":"audio_processed","namespace":"integration.tracks","fields":[{"name":"track_id","type":"string"},{"name":"audio_mp3_filename","type":"string"},{"name":"waveform_samples","type":{"type":"array","items":"int"}},{"name":"duration","type":"string"}]}]}]}\x00\xc4\x8ad\xceF\x9c\xef\x99\n}#y7\x96\xba\xb4\x02\xe2\x01*track_upload_verified\x00H341aa6a3-5ecb-4ac0-8f27-bc2fe5abc9d4^tracks-audio/-1khgyI4kYfSf8hq2XiXZjg-1569510465\x08main\xc4\x8ad\xceF\x9c\xef\x99\n}#y7\x96\xba\xb4'
Run Code Online (Sandbox Code Playgroud)

这就是我所期望的,即被咬。

I am quite sure about the schema as I validated it using this tool.

Has anyone had a similar issue?

Ben*_*zar 0

似乎 avro 文件中存在损坏,但根据您的问题和输入很难理解。

请尝试以下操作:

  1. 安装 aws_schema_registry
  2. 从 aws_schema_registry.avro 导入 AvroSchema
  3. 初始化 - schema = AvroSchema(my_schema)
  4. 现在消耗来自kafka的字节事件:result = schema.read(message)