动态分叉梁(数据流)管道基于TaggedOutputs的数量

and*_*622 4 google-cloud-platform google-cloud-dataflow apache-beam

我目前正在尝试根据数据中包含的特定密钥将在Google Dataflow上运行的Beam管道分成多个目标.当使用TaggedOutput标记对"fork"的每个端点进行硬编码时,我能够正常工作.但是,在将来,我不会总是知道底层数据中存在哪些键,因此我希望使用类似于下面的for循环动态创建流程中的后续步骤:

p = beam.Pipeline(options=pipeline_options)

pipe = p | 'ReadFromGCS' >> ReadFromText(args['input']) \
         | 'TagAllLines' >> beam.ParDo(produce_tagged_output_keys).with_outputs()

for client in pipe:
  client = pipe.client | client+'MapLinesToDicts' >> beam.Map(lambda line: dict(record=line)) \
                         | client+'WriteToBQTable' >> WriteToBigQuery(client+'_test', dataset=bq_dataset, project=project_id, schema='record:string')
Run Code Online (Sandbox Code Playgroud)

我的理解是结果 .with_outputs()应该是可迭代的,不是吗?当我运行上面的命令时,它会pipe毫无问题地执行,但完全忽略for循环.有没有办法动态地做到这一点,我错过了?

Rob*_*bbe 7

.with_outputs()如果指定了标记,则结果只能迭代

.with_outputs()没有指定标签的结果是不可迭代的,因为在管道构建时尚不知道输出.然而
,结果.with_outputs(tag1, tag2, main=tag0)是可迭代的:
在这里测试差异

请记住,使用apache beam,首先构造管道,然后让数据流过它.因此,您不能让流水线架构在运行时依赖于数据.

我会提出另一个解决方案:

假设您有一个包含以下结构的元素的pcollection 集合:

(key, bq_row)
Run Code Online (Sandbox Code Playgroud)

其中是决定写入哪个表的功能,以及bq_row要写入bigquery行的数据.

然后,您可以按键对元素进行分组:

grouped = collection | beam.GroupByKey()
Run Code Online (Sandbox Code Playgroud)

现在,您为每个不同的键获得一个元素.因此,一个元素中的所有bq_row应该写入相同的bigquery表,这取决于元素键.

然后,您可以定义DoFn将每个元素中的所有行写入适当的表.(示例代码)

class WriteToBigQueryFn(beam.DoFn):

    def __init__(self, dataset_name):
        super(BigQueryWriter, self).__init__()
        self.client = bigquery.Client()
        self.dataset = client.dataset(dataset_name)

    def process(self, (key, data)):
        table_name = get_table_name(key) # get table name based on key
        table = self.dataset.table(table_name)
        table.reload(self.client)

        table.insert_data(data)

grouped | beam.ParDo(WriteToBigQueryFn)
Run Code Online (Sandbox Code Playgroud)

  • 如果指定了标记,`.with_outputs()`的结果只能是可迭代的.我在答案中添加了一些代码示例的澄清说明. (2认同)