sel*_*tle 1 python google-bigquery google-cloud-dataflow
在为Dataflow构建管道时,我有一个看似简单的问题.我有多个管道从外部源获取数据,转换数据并将其写入几个BigQuery表.完成此过程后,我想运行一个查询刚刚生成的表的查询.理想情况下,我希望这可以在同一份工作中发生.
这是Dataflow的使用方式,还是应该在作业之间分配对BigQuery的加载和表的查询?
如果在同一个工作中这是可能的,那么如何解决这个问题,因为BigQuerySink不会生成PCollection?如果在同一个作业中无法做到这一点,是否有某种方法可以在完成另一个作业(即写作和查询作业)时触发作业?
你提到在一个工作中需要做的事情 - BigQuerySink
需要产生一个PCollection
.即使它是空的,您也可以使用它作为步骤的输入,该步骤BigQuery
以使得该步骤等待直到第一个接收器完成的方式读取.
您需要创建自己的BigQuerySink版本才能执行此操作.
如果可能的话,更简单的选择可能是从您写入BigQuery的集合中读取第二步,而不是阅读刚刚放入BigQuery的表.例如:
PCollection<TableRow> rows = ...;
rows.apply(BigQuery.Write.to(...));
rows.apply(/* rest of the pipeline */);
Run Code Online (Sandbox Code Playgroud)
如果您想继续处理写入BigQuery而不是表行的元素,您甚至可以更早地执行此操作.