使用Avrocoder进行自定义类型和泛型

use*_*464 4 avro google-cloud-platform google-cloud-dataflow

我正在尝试使用AvroCoder序列化自定义类型,该类型在我的管道中的PCollections中传递.自定义类型有一个通用字段(当前是一个字符串)当我运行管道时,我得到如下的AvroTypeException可能是由于泛型字段.为这个对象构建和传递AvroSchema是解决这个问题的唯一方法吗?

Exception in thread "main" org.apache.avro.AvroTypeException: Unknown type: T
 at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:255)
 at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:514)
 at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:593)
 at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:472)
 at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
 at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:116)
Run Code Online (Sandbox Code Playgroud)

我还附上了我的注册表代码以供参考.

pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
    @Override
    public Coder<?> create(List<? extends Coder<?>> componentCoders) {
        return AvroCoder.of(GenericTypeClass.class);
    }

    @Override
    public List<Object> getInstanceComponents(Object value) {
        return Collections.singletonList(((GenericTypeClass<Object>) value).key);
    }
});
Run Code Online (Sandbox Code Playgroud)

Ken*_*les 7

就设置而言,你已经完成了所有工作CoderFactory,但是在撰写本文时,用于自动生成模式的Avro ReflectData机制AvroCoder不适用于泛型类型.这被追踪为AVRO-1571问题.另请参阅此StackOverflow问题.

为了允许GenericTypeClass<T>对某些特定值进行编码T,您必须提供一些显式的架构信息.有两种方法可以继续:

第一种方法是T在您的类型字段中提供显式模式GenericTypeClass<T>,如下所示:

class GenericTypeClass<T> {
  // Avro requires a no-args constructor
  public GenericTypeClass() {}

  @AvroSchema("[\"string\", \"int\", ...]")
  private T genericField;
}
Run Code Online (Sandbox Code Playgroud)

缺点是它仅限于有限的静态联合模式,并且需要手动内联JSON模式以获得更复杂的值T.

第二种方法是在构建AvroCoderin 时提供显式模式CoderFactory,并提供此模式AvroCoder.of(Class, Schema).

pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? extends Coder<?>> componentCoders) {
      return AvroCoder.of(
          GenericTypeClass.class
          schemaFromCoder(componentCoders.get(0)));
  }

  ...
});
Run Code Online (Sandbox Code Playgroud)

这主要围绕将a Coder<T>转换为模式T.这对于基本类型应该很容易,并且对于ReflectData支持的POJO可以管理.它还为更加困难的案件的临时支持提供了一个钩子.