如何在beam sdk 2.0中创建自定义Combine.PerKey

Jon*_*ter 2 google-cloud-dataflow

我们想出了如何在beam sdk 2.0中创建自定义组合函数(经过大量猜测和梁sdk 2.0代码读取之后),因为数据流sdk 1.x语法在sdk 2.0中不起作用.

但是,我们无法弄清楚如何在beam sdk 2.0中创建自定义组合PER KEY功能.任何帮助或指针(或更好的实际例子)将不胜感激.(我们搜索了互联网上的文档或示例,但没有发现;我们也试图查看beam sdk 2.0的Combine类中的代码,但无法弄清楚,特别是因为PerKey类现在有一个私有构造函数,所以我们不能再延长它了.)

如果它有帮助,这里是我们如何在beam sdk 2.0中正确创建自定义组合器(没有)键,但我们无法弄清楚如何使用键创建一个:

public class CombineTemplateIntervalsIntoBlocks
        extends Combine.AccumulatingCombineFn<ImmutableMySetOfIntervals, TemplateIntervalAccum, ArrayList<ImmutableMySetOfIntervals>>{


    public CombineTemplateIntervalsIntoBlocks() {
    }

    @Override
    public TemplateIntervalAccum createAccumulator() {
        return new TemplateIntervalAccum()
    }
Run Code Online (Sandbox Code Playgroud)

然后

public class TemplateIntervalAccum
        implements Combine.AccumulatingCombineFn.Accumulator<ImmutableMySetOfIntervals, TemplateIntervalAccum, ArrayList<ImmutableMySetOfIntervals>>, Serializable {
...
Run Code Online (Sandbox Code Playgroud)

Ben*_*ers 5

您不需要以不同方式创建CombineFn以使用Combine.PerKey.

您可以扩展AccumulatingCombineFn(将合并逻辑放入累加器)或扩展CombineFn(将合并逻辑放入CombineFn).还有其他选项,如BinaryCombineFnIterableCombineFn.

说你有一个CombineFn<InputT, AccumT, OutputT>combineFn:

  • 您可以使用Combine.globally(combineFn)创建a PTransform来获取PCollection<InputT>并组合所有元素.
  • 或者,您可以使用Combine.perKey(combineFn)创建a PTransform来获取PCollection<KV<K, InputT>>并组合与每个键关联的所有值并组合它们.这相当于Combine.PerKey我相信你所指的.