写表后Apache Beam Pipeline查询表

ree*_*106 3 python google-cloud-dataflow apache-beam

我有一个将结果写入 BigQuery 表的 Apache Beam/Dataflow 管道。然后我想查询此表以获取管道的单独部分。但是,我似乎无法弄清楚如何正确设置此管道依赖项。我编写(然后想要查询)的新表与用于某些过滤逻辑的单独表相连,这就是为什么我实际上需要编写表然后运行查询。逻辑如下:

with beam.Pipeline(options=pipeline_options) as p:
    table_data = p | 'CreatTable' >> # ... logic to generate table ...

    # Write Table to BQ
    table_written = table_data | 'WriteTempTrainDataBQ' >> beam.io.WriteToBigQuery(...)

    query_results = table_written | 'QueryNewTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_new_table))
Run Code Online (Sandbox Code Playgroud)

ifquery_new_table实际上是对已经存在的 BQ 表的查询,我更改为query_results = p |而不是table_written这正常工作。但是,如果我尝试查询我在管道中间写入的表,那么在实际生成该表之前,我无法让管道步骤“等待”。有没有办法做到这一点,我忽略了?

当我尝试按顺序执行此步骤时,我收到一个断言错误assert isinstance(pbegin, pvalue.PBegin) AssertionError,我正在阅读这意味着这table_written是问题,因为它不是有效的 PCollection 实例。

有谁知道我可以用什么来代替 table_written 来让它按需要按顺序运行?

jkf*_*kff 5

Beam 目前不支持用例“在 BigQuery 写入完成后做某事”。唯一的解决方法是运行单独的管道:让您的主程序:运行写入 BigQuery 的管道;等待管道完成;运行另一个从 BigQuery 读取的管道。

这是一个非常常见的功能,我们正在开始设计这种支持(更一般地说,升级各种 IO 写入以支持它们之后的排序操作),但我不知道我们什么时候会完成。