编辑:我使用 beam.io.WriteToBigQuery 并打开了接收器实验选项来使其工作。我实际上已经打开了它,但我的问题是我试图从包含在 str() 中的两个变量(数据集+表)“构建”完整的表引用。这是将整个值提供程序参数数据作为字符串,而不是调用 get() 方法来仅获取值。
我正在尝试生成一个数据流模板,然后从 GCP 云功能进行调用。(作为参考,我的数据流作业应该读取一个包含一堆文件名的文件,然后从 GCS 读取所有这些文件并将其写入 BQ )。因此,我需要以这种方式编写它,以便我可以使用运行时值提供程序来传递 BigQuery 数据集/表。
我的帖子底部是我目前的代码,省略了一些与问题无关的内容。特别注意 BQ_flexible_writer(beam.DoFn) - 这就是我尝试“自定义”beam.io.WriteToBigQuery 的地方,以便它接受运行时值提供程序。
我的模板生成得很好,当我在不提供运行时变量(依赖于默认值)的情况下测试运行管道时,它会成功,并且在控制台中查看作业时我会看到添加的行。但是,在检查 BigQuery 时没有数据(三次检查日志中的数据集/表名称是否正确)。不确定它去了哪里或者我可以添加什么日志记录来了解元素发生了什么?
你知道这里发生了什么吗?或者关于如何使用运行时变量写入 BigQuery 的建议?我什至可以按照将 beam.io.WriteToBigQuery 包含在 DoFn 中的方式来调用它,还是必须采用 beam.io.WriteToBigQuery 背后的实际代码并使用它?
#=========================================================
class BQ_flexible_writer(beam.DoFn):
def __init__(self, dataset, table):
self.dataset = dataset
self.table = table
def process(self, element):
dataset_res = self.dataset.get()
table_res = self.table.get()
logging.info('Writing to table: {}.{}'.format(dataset_res,table_res))
beam.io.WriteToBigQuery(
#dataset= runtime_options.dataset,
table = str(dataset_res) + '.' + str(table_res),
schema = SCHEMA_ADFImpression,
project = str(PROJECT_ID), #options.display_data()['project'],
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, …Run Code Online (Sandbox Code Playgroud) python runtime google-cloud-platform google-cloud-dataflow apache-beam