and*_*622 4 google-cloud-platform google-cloud-dataflow apache-beam
我目前正在尝试根据数据中包含的特定密钥将在Google Dataflow上运行的Beam管道分成多个目标.当使用TaggedOutput标记对"fork"的每个端点进行硬编码时,我能够正常工作.但是,在将来,我不会总是知道底层数据中存在哪些键,因此我希望使用类似于下面的for循环动态创建流程中的后续步骤:
p = beam.Pipeline(options=pipeline_options)
pipe = p | 'ReadFromGCS' >> ReadFromText(args['input']) \
| 'TagAllLines' >> beam.ParDo(produce_tagged_output_keys).with_outputs()
for client in pipe:
client = pipe.client | client+'MapLinesToDicts' >> beam.Map(lambda line: dict(record=line)) \
| client+'WriteToBQTable' >> WriteToBigQuery(client+'_test', dataset=bq_dataset, project=project_id, schema='record:string')
Run Code Online (Sandbox Code Playgroud)
我的理解是结果 .with_outputs()
应该是可迭代的,不是吗?当我运行上面的命令时,它会pipe
毫无问题地执行,但完全忽略for循环.有没有办法动态地做到这一点,我错过了?
.with_outputs()
如果指定了标记,则结果只能迭代
.with_outputs()
没有指定标签的结果是不可迭代的,因为在管道构建时尚不知道输出.然而
,结果.with_outputs(tag1, tag2, main=tag0)
是可迭代的:
在这里测试差异
请记住,使用apache beam,首先构造管道,然后让数据流过它.因此,您不能让流水线架构在运行时依赖于数据.
我会提出另一个解决方案:
假设您有一个包含以下结构的元素的pcollection
集合:
(key, bq_row)
Run Code Online (Sandbox Code Playgroud)
其中键是决定写入哪个表的功能,以及bq_row要写入bigquery行的数据.
然后,您可以按键对元素进行分组:
grouped = collection | beam.GroupByKey()
Run Code Online (Sandbox Code Playgroud)
现在,您为每个不同的键获得一个元素.因此,一个元素中的所有bq_row应该写入相同的bigquery表,这取决于元素键.
然后,您可以定义DoFn
将每个元素中的所有行写入适当的表.(示例代码)
class WriteToBigQueryFn(beam.DoFn):
def __init__(self, dataset_name):
super(BigQueryWriter, self).__init__()
self.client = bigquery.Client()
self.dataset = client.dataset(dataset_name)
def process(self, (key, data)):
table_name = get_table_name(key) # get table name based on key
table = self.dataset.table(table_name)
table.reload(self.client)
table.insert_data(data)
grouped | beam.ParDo(WriteToBigQueryFn)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
858 次 |
最近记录: |