use*_*464 4 avro google-cloud-platform google-cloud-dataflow
我正在尝试使用AvroCoder序列化自定义类型,该类型在我的管道中的PCollections中传递.自定义类型有一个通用字段(当前是一个字符串)当我运行管道时,我得到如下的AvroTypeException可能是由于泛型字段.为这个对象构建和传递AvroSchema是解决这个问题的唯一方法吗?
Exception in thread "main" org.apache.avro.AvroTypeException: Unknown type: T
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:255)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:514)
at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:593)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:472)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:116)
Run Code Online (Sandbox Code Playgroud)
我还附上了我的注册表代码以供参考.
pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
@Override
public Coder<?> create(List<? extends Coder<?>> componentCoders) {
return AvroCoder.of(GenericTypeClass.class);
}
@Override
public List<Object> getInstanceComponents(Object value) {
return Collections.singletonList(((GenericTypeClass<Object>) value).key);
}
});
Run Code Online (Sandbox Code Playgroud)
就设置而言,你已经完成了所有工作CoderFactory,但是在撰写本文时,用于自动生成模式的Avro ReflectData机制AvroCoder不适用于泛型类型.这被追踪为AVRO-1571问题.另请参阅此StackOverflow问题.
为了允许GenericTypeClass<T>对某些特定值进行编码T,您必须提供一些显式的架构信息.有两种方法可以继续:
第一种方法是T在您的类型字段中提供显式模式GenericTypeClass<T>,如下所示:
class GenericTypeClass<T> {
// Avro requires a no-args constructor
public GenericTypeClass() {}
@AvroSchema("[\"string\", \"int\", ...]")
private T genericField;
}
Run Code Online (Sandbox Code Playgroud)
缺点是它仅限于有限的静态联合模式,并且需要手动内联JSON模式以获得更复杂的值T.
第二种方法是在构建AvroCoderin 时提供显式模式CoderFactory,并提供此模式AvroCoder.of(Class, Schema).
pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
@Override
public Coder<?> create(List<? extends Coder<?>> componentCoders) {
return AvroCoder.of(
GenericTypeClass.class
schemaFromCoder(componentCoders.get(0)));
}
...
});
Run Code Online (Sandbox Code Playgroud)
这主要围绕将a Coder<T>转换为模式T.这对于基本类型应该很容易,并且对于ReflectData支持的POJO可以管理.它还为更加困难的案件的临时支持提供了一个钩子.
| 归档时间: |
|
| 查看次数: |
1299 次 |
| 最近记录: |