设置自定义编码器和处理参数化类型

use*_*464 5 google-cloud-platform google-cloud-dataflow

我有两个与我在Dataflow管道中遇到的编码器问题有关的问题.

  • 如何为自定义数据类型设置编码器?该类只包含三个项目 - 两个双精度数和另一个参数化属性.我尝试用SerializableCoder注释类型,但我仍然得到错误"com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException:无法根据类接口java.util.Set提供编码器:没有注册CoderFactory为了上课." Set实际上包含参数化的自定义数据类型 - 所以我假设自定义数据类型是问题.我找不到足够的文档/示例正确的方法来做到这一点.请将我指向正确的地方.
  • 即使没有自定义数据类型,每当我尝试切换到参数化版本的Transform函数时,都会导致编码器错误.具体来说,在参数化的复杂变换中,ParDo使用参数化类型,但是当我在ParDo之后对结果PCollection应用Combine.PerKey时,会导致CoderNotFoundException.

关于这两个项目的任何帮助都会有所帮助,因为我现在有点困惑.

Ken*_*les 13

看起来你已被两个问题所困扰.感谢您引起我们的注意!幸运的是,在我们改进的过程中,两者都有简单的解决方法.

第一个问题是,默认的编码器注册表不具有映射的条目Set.classSetCoder.我们已经提交了GitHub问题#56来跟踪其解决方案.在此期间,您可以使用以下代码执行所需的注册:

pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
Run Code Online (Sandbox Code Playgroud)

第二个问题是参数化类型目前需要在编码器注册表中进行高级处理,因此@DefaultCoder不会受到尊重.我们已经提交了Github问题#57来跟踪这一情况.确保SerializableCoder在任何地方使用的最佳方法是为您的类型CustomType注册一个CoderFactory将返回的类型SerializableCoder.假设你的类型是这样的:

public class CustomType<T extends Serializable> implements Serializable {
  T field;
}
Run Code Online (Sandbox Code Playgroud)

然后,以下代码注册一个CoderFactory产生适当SerializableCoder实例的代码:

pipeline.getCoderRegistry().registerCoder(CustomType.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? extends Coder<?>>) {
    // No matter what the T is, return SerializableCoder
    return SerializableCoder.of(CustomType.class);
  }

  @Override
  public List<Object> getInstanceComponents(Object value) {
    // Return the T inside your CustomType<T> to enable coder inference for Create
    return Collections.singletonList(((CustomType<Object>) value).field);
  }
});
Run Code Online (Sandbox Code Playgroud)

现在,无论何时CustomType在管道中使用,编码器注册表都会生成一个SerializableCoder.

请注意,这SerializableCoder不是确定性的(编码对象的字节对于对象不一定相等equals()),因此使用此编码器编码的值不能用作GroupByKey操作中的键.