使用 Enum 进行 Avro 架构演化 – 反序列化崩溃

Ber*_*ram 5 avro

我在两个单独的 AVCS 模式文件中定义了记录的两个版本。我使用命名空间来区分版本\n SimpleV1.avsc

\n
{\n  "type" : "record",\n  "name" : "Simple",\n  "namespace" : "test.simple.v1",\n  "fields" : [ \n      {\n        "name" : "name",\n        "type" : "string"\n      }, \n      {\n        "name" : "status",\n        "type" : {\n          "type" : "enum",\n          "name" : "Status",\n          "symbols" : [ "ON", "OFF" ]\n        },\n        "default" : "ON"\n      }\n   ]\n}\n
Run Code Online (Sandbox Code Playgroud)\n

JSON 示例

\n
{"name":"A","status":"ON"}\n
Run Code Online (Sandbox Code Playgroud)\n

版本 2 只是有一个带有默认值的附加描述字段。

\n

简单V2.avsc

\n
{\n  "type" : "record",\n  "name" : "Simple",\n  "namespace" : "test.simple.v2",\n  "fields" : [ \n      {\n        "name" : "name",\n        "type" : "string"\n      }, \n      {\n        "name" : "description",\n        "type" : "string",\n        "default" : ""\n      }, \n      {\n        "name" : "status",\n        "type" : {\n          "type" : "enum",\n          "name" : "Status",\n          "symbols" : [ "ON", "OFF" ]\n        },\n        "default" : "ON"\n      }\n   ]\n}\n
Run Code Online (Sandbox Code Playgroud)\n

JSON 示例

\n
{"name":"B","description":"b","status":"ON"}\n
Run Code Online (Sandbox Code Playgroud)\n

两种模式都被序列化为 Java 类。\n在我的示例中,我将测试向后兼容性。由 V1 写入的记录应由使用 V2 的读取器读取。我想看到插入了默认值。只要我不使用枚举,这就可以工作。

\n
public class EnumEvolutionExample {\n\n    public static void main(String[] args) throws IOException {\n        Schema schemaV1 = new org.apache.avro.Schema.Parser().parse(new File("./src/main/resources/SimpleV1.avsc"));\n        //works as well\n        //Schema schemaV1 = test.simple.v1.Simple.getClassSchema();\n        Schema schemaV2 = new org.apache.avro.Schema.Parser().parse(new File("./src/main/resources/SimpleV2.avsc"));\n\n        test.simple.v1.Simple simpleV1 = test.simple.v1.Simple.newBuilder()\n                .setName("A")\n                .setStatus(test.simple.v1.Status.ON)\n                .build();\n        \n        \n        SchemaPairCompatibility schemaCompatibility = SchemaCompatibility.checkReaderWriterCompatibility(\n                schemaV2,\n                schemaV1);\n        //Checks that writing v1 and reading v2 schemas is compatible\n        Assert.assertEquals(SchemaCompatibilityType.COMPATIBLE, schemaCompatibility.getType());\n        \n        byte[] binaryV1 = serealizeBinary(simpleV1);\n        \n        //Crashes with: AvroTypeException: Found test.simple.v1.Status, expecting test.simple.v2.Status\n        test.simple.v2.Simple v2 = deSerealizeBinary(binaryV1, new test.simple.v2.Simple(), schemaV1);\n        \n    }\n    \n    public static byte[] serealizeBinary(SpecificRecord record) {\n        DatumWriter<SpecificRecord> writer = new SpecificDatumWriter<>(record.getSchema());\n        byte[] data = new byte[0];\n        ByteArrayOutputStream stream = new ByteArrayOutputStream();\n        Encoder binaryEncoder = EncoderFactory.get()\n            .binaryEncoder(stream, null);\n        try {\n            writer.write(record, binaryEncoder);\n            binaryEncoder.flush();\n            data = stream.toByteArray();\n        } catch (IOException e) {\n            System.out.println("Serialization error " + e.getMessage());\n        }\n\n        return data;\n    }\n    \n    public static <T extends SpecificRecord> T deSerealizeBinary(byte[] data, T reuse, Schema writer) {\n        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);\n        DatumReader<T> datumReader = new SpecificDatumReader<>(writer, reuse.getSchema());\n        try {\n            T datum = datumReader.read(null, decoder);\n            return datum;\n        } catch (IOException e) {\n            System.out.println("Deserialization error" + e.getMessage());\n        }\n        return null;\n    }\n\n}\n\n
Run Code Online (Sandbox Code Playgroud)\n

checkReaderWriterCompatibility 方法确认架构是兼容的。\n但是当我反序列化 I\xe2\x80\x99m 时出现以下异常

\n
Exception in thread "main" org.apache.avro.AvroTypeException: Found test.simple.v1.Status, expecting test.simple.v2.Status\n    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:309)\n    at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)\n    at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:260)\n    at org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:267)\n    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)\n    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)\n    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)\n    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)\n    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)\n    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)\n    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)\n    at test.EnumEvolutionExample.deSerealizeBinary(EnumEvolutionExample.java:70)\n    at test.EnumEvolutionExample.main(EnumEvolutionExample.java:45)\n
Run Code Online (Sandbox Code Playgroud)\n

我不明白为什么 Avro 认为它有 v1.Status。命名空间不是编码的一部分。\n这是一个错误还是有人知道如何运行它?

\n

Ber*_*ram 0

找到了解决方法。我将枚举移至“无版本”命名空间。所以两个版本都是一样的。但实际上它对我来说看起来像是一个错误。转换记录不是问题,但枚举不起作用。两者都是 Avro 中的复杂类型。

{
  "type" : "record",
  "name" : "Simple",
  "namespace" : "test.simple.v1",
  "fields" : [ 
      {
        "name" : "name",
        "type" : "string"
      }, 
      {
        "name" : "status",
        "type" : {
          "type" : "enum",
          "name" : "Status",
          "namespace" : "test.model.unversioned",
          "symbols" : [ "ON", "OFF" ]
        },
        "default" : "ON"
      }
   ]
}
Run Code Online (Sandbox Code Playgroud)