使用CombineFn从所有节点累积数据后,合并每个键的所有值

Sat*_*ram 1 java google-cloud-dataflow

我想在perKey的基础上迭代KV pCollection的值.我使用下面的代码来组合使用自定义类,

PCollection<KV<String, String>> combinesAttributes =
              valExtract.get(extAttUsers).apply(Combine.<String, String>perKey(
                      new CombineAttributes()));
Run Code Online (Sandbox Code Playgroud)

以下是我的自定义组合课程,

public static class CombineAttributes implements SerializableFunction<Iterable<String>, String> {
   @Override
   public String apply(Iterable<String> input) {...}..}
Run Code Online (Sandbox Code Playgroud)

这适用于小输入,但对于大输入,联合收割机不如预期.输出只合并了一些键的值,其他的则丢失了.我假设输出只包含来自一个节点的数据.

https://cloud.google.com/dataflow/model/combine中的文档提及使用CombineFn以便在所有节点中组合每个键的完整值集合.

但是,当我更改下面的自定义组合功能时,我收到以下错误,

incompatible types: CombineAttributes cannot be converted to com.google.cloud.dataflow.sdk.transforms.SerializableFunction<java.lang.Iterable<java.lang.String>,java.lang.String>
Run Code Online (Sandbox Code Playgroud)

结合功能

public static class CombineAttributes extends CombineFn<Iterable<String>, CombineAttributes.Accum, String> {

public static class Accum {
  List<String> inputList = new ArrayList<String>();
}
public Accum createAccumulator() { return new Accum(); }
public Accum addInput(Accum accum, Iterable<String> input) {
  for (String item : input) {
    accum.inputList.add(item);
  }
  return accum;
}
public Accum mergeAccumulators(Iterable<Accum> accums) {
   Accum merged = createAccumulator();
   for (Accum accum : accums) {
     for (String item : accum.inputList) {
       merged.inputList.add(item);
     }
   }
   return merged;
 }
 public String extractOutput(Accum accum) {
   return "";
 }
}
Run Code Online (Sandbox Code Playgroud)

没有可用于组合perKey扩展的示例代码CombineFn.请让我知道上面的代码有什么问题.

Ben*_*ers 6

如果你只是想通过所有的值进行迭代,你可以使用GroupByKey转一PCollection<KV<K, V>>PCollection<KV<K, Iterable<V>>.然后你可以写一个DoFn处理它的每个元素,并在里面迭代Iterable<V>.

请注意,您只会在同一窗口中收到与键相关联的所有值.如果您使用的是默认全局窗口,那么这将是所有值.


CombineCombineFn当您想要将所有值组合成较小的输出时,它们非常有用.例如,如果您想获取所有值的总和或均值,那么使用Sum.perKey()或更有效Mean.perKey().效率来自于能够传递(和合并)累加器.在这种情况下Sum,这对应于部分和.

例如,假设管道在两台机器上运行.第一台机器进程KV<user1, attr1a>, KV<user1, attr1b>, KV<user2, attr2a>和第二台机器进程KV<user1, attr1c>, KV<user2, attr2b>.

CombineAttributes(这是实现两种方式)将首先在每台机器上调用.所以它可以组合[attr1a, attr1b]成一个字符串或累加器(比方说attr1a+attr1b).然后,它会在其他机器上运行相结合[attr1c]attr1c.然后它将合并所有这些部分结果以获得最终的累加器 - attr1a+attr1b+attr1c.在原始实现的情况下,这将是最终答案.在后者中,extractOutput将在此累加器上调用.