jkf*_*kff 9 google-bigquery google-cloud-dataflow apache-beam
假设我有一个PCollection<Foo>并且我想将它写入多个BigQuery表,为每个表选择一个可能不同的表Foo.
如何使用Apache Beam BigQueryIOAPI 执行此操作?
jkf*_*kff 23
使用BigQueryIOApache Beam中最近添加的功能可以实现这一点.
PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<Foo> value) {
Foo foo = value.getValue();
// Also available: value.getWindow(), getTimestamp(), getPane()
String tableSpec = ...;
String tableDescription = ...;
return new TableDestination(tableSpec, tableDescription);
}
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
@Override
public TableRow apply(Foo foo) {
return ...;
}
}).withSchema(...));
Run Code Online (Sandbox Code Playgroud)
根据输入PCollection<Foo>是有界还是无界,这将导致创建多个BigQuery导入作业(每个表一个或多个,具体取决于数据量),或者它将使用BigQuery流插入API.
API最灵活的版本使用DynamicDestinations,它允许您使用不同的模式将不同的值写入不同的表,甚至允许您在所有这些计算中使用来自管道其余部分的侧输入.
此外,BigQueryIO已被重构为许多可重用的转换,您可以自己组合这些转换以实现更复杂的用例 - 请参阅源目录中的文件.
此功能将包含在Apache Beam的第一个稳定版本中,并将包含在Dataflow SDK的下一个版本中(将基于Apache Beam的第一个稳定版本).现在你可以通过在github上对HEAD的梁的快照运行你的管道来使用它.
| 归档时间: |
|
| 查看次数: |
2439 次 |
| 最近记录: |