Sar*_*mro 1 google-cloud-dataflow apache-beam apache-beam-io
嗨,我有几个查询,我想使用 Apache Beam 依次运行和保存结果,我见过一些类似的问题,但找不到答案。我习惯于使用 Airflow 设计管道,而我对 Apache Beam 还是比较陌生。我正在使用 Dataflow 运行程序。这是我现在的代码:我希望 query2 仅在 query1 结果保存到相应表后运行。我如何链接它们?
PCollection<TableRow> resultsStep1 = getData("Run Query 1",
"Select * FROM basetable");
resultsStep1.apply("Save Query1 data",
BigQueryIO.writeTableRows()
.withSchema(BigQueryUtils.toTableSchema(resultsStep1.getSchema()))
.to("resultsStep1")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
PCollection<TableRow> resultsStep2 = getData("Run Query 2",
"Select * FROM resultsStep1");
resultsStep2.apply("Save Query2 data",
BigQueryIO.writeTableRows()
.withSchema(BigQueryUtils.toTableSchema(resultsStep2.getSchema()))
.to("resultsStep2")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
Run Code Online (Sandbox Code Playgroud)
这是我的 getData 函数定义:
private PCollection<TableRow> getData(final String taskName, final String query) {
return pipeline.apply(taskName,
BigQueryIO.readTableRowsWithSchema()
.fromQuery(query)
.usingStandardSql()
.withCoder(TableRowJsonCoder.of()));
}
Run Code Online (Sandbox Code Playgroud)
编辑(更新):结果:
You can’t sequence the completion of a BigQuery write with other steps of your pipeline.
我认为这是设计管道的一大限制。来源:https : //beam.apache.org/documentation/io/built-in/google-bigquery/#limitations
您可以使用该Wait方法来执行此操作。下面是一个人为的例子
PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
data.apply(Wait.on(firstWriteResults))
// Windows of this intermediate PCollection will be processed no earlier than when
// the respective window of firstWriteResults closes.
.apply(ParDo.of(...write to second database...));
Run Code Online (Sandbox Code Playgroud)
您可以在此处提供的 API 文档中找到更多详细信息 - https://beam.apache.org/releases/javadoc/2.17.0/index.html?org/apache/beam/sdk/transforms/Wait.html
| 归档时间: |
|
| 查看次数: |
753 次 |
| 最近记录: |