在Apache Beam中为不同的BigQuery表写入不同的值

jkf*_*kff 9 google-bigquery google-cloud-dataflow apache-beam

假设我有一个PCollection<Foo>并且我想将它写入多个BigQuery表,为每个表选择一个可能不同的表Foo.

如何使用Apache Beam BigQueryIOAPI 执行此操作?

jkf*_*kff 23

使用BigQueryIOApache Beam中最近添加的功能可以实现这一点.

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));
Run Code Online (Sandbox Code Playgroud)

根据输入PCollection<Foo>是有界还是无界,这将导致创建多个BigQuery导入作业(每个表一个或多个,具体取决于数据量),或者它将使用BigQuery流插入API.

API最灵活的版本使用DynamicDestinations,它允许您使用不同的模式将不同的值写入不同的表,甚至允许您在所有这些计算中使用来自管道其余部分的侧输入.

此外,BigQueryIO已被重构为许多可重用的转换,您可以自己组合这些转换以实现更复杂的用例 - 请参阅源目录中的文件.

此功能将包含在Apache Beam的第一个稳定版本中,并将包含在Dataflow SDK的下一个版本中(将基于Apache Beam的第一个稳定版本).现在你可以通过在github上对HEAD的梁的快照运行你的管道来使用它.

  • Python SDK现在提供此功能吗? (2认同)