cod*_*one 4 dataflow python-3.x google-cloud-dataflow apache-beam
没有关于如何将pCollections转换为输入到.CoGroupByKey()所需的pCollections的文档
上下文基本上我有两个大的pCollections,我需要能够找到两者之间的差异,对于第二类ETL更改(如果它在pColl1中不存在,那么添加到pColl2中的嵌套字段),这样我就能够从BigQuery保留这些记录的历史记录.
管道架构:
建议任何帮助.我在SO上发现了一个java链接,它做了我需要完成的同样的事情(但是在Python SDK上没有任何内容).
从PCollection <TableRow>转换为PCollection <KV <K,V >>
是否有Apache Beam的文档/支持,特别是Python SDK?
为了获得CoGroupByKey()工作,你需要有PCollections的tuples,其中第一个元素将是关键和第二- 数据.
在你的情况下,你说你有BigQuerySource,在当前版本的Apache Beam输出PCollection of dictionaries(代码)中,每个条目代表表中读取的一行.您需要将此PCollections映射到元组,如上所述.这很容易使用ParDo:
class MapBigQueryRow(beam.DoFn):
def process(self, element, key_column):
key = element.get(key_column)
yield key, element
data1 = (p
| "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1"))
| "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1"))
data2 = (p
| "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2"))
| "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2"))
co_grouped = ({"data1": data1, "data2": data2} | beam.CoGroupByKey())
# do your processing with co_grouped here
Run Code Online (Sandbox Code Playgroud)
顺便说一下,可以在这里找到Python SDK for Apache Beam的文档.
| 归档时间: |
|
| 查看次数: |
1328 次 |
| 最近记录: |