Ton*_*nca 3 python google-cloud-dataflow apache-beam
我正在尝试执行去规范化操作,我需要使用以下逻辑重新组织表:
| itemid | class | value |
+--------+-------+-------+
| 1 | A | 0.2 | | itemid | value A | value B | value C |
| 1 | B | 10.3 | ==> +--------+---------+---------+---------+
| 2 | A | 3.0 | ==> | 1 | 0.2 | 10.3 | |
| 2 | B | 0.2 | ==> | 2 | 3.0 | 0.2 | |
| 3 | A | 0.0 | | 3 | 0.0 | 1.2 | 5.4 |
| 3 | B | 1.2 |
| 3 | C | 5.4 |
Run Code Online (Sandbox Code Playgroud)
我的方法是执行 for 循环以通过 过滤class,因为我知道类的先验列表,然后加入结果 pcollections。
高级代码:
| itemid | class | value |
+--------+-------+-------+
| 1 | A | 0.2 | | itemid | value A | value B | value C |
| 1 | B | 10.3 | ==> +--------+---------+---------+---------+
| 2 | A | 3.0 | ==> | 1 | 0.2 | 10.3 | |
| 2 | B | 0.2 | ==> | 2 | 3.0 | 0.2 | |
| 3 | A | 0.0 | | 3 | 0.0 | 1.2 | 5.4 |
| 3 | B | 1.2 |
| 3 | C | 5.4 |
Run Code Online (Sandbox Code Playgroud)
和加入:
CLASSES = ["A", "B", "C"]
tables = [
(
data
| "Filter by Language" >> beam.Filter(lambda elem: elem["class"]==c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for cin CLASSES
]
Run Code Online (Sandbox Code Playgroud)
与(根据 Peter Kim 的建议进行编辑):
_ = (
tables
| "Flatten" >> beam.Flatten()
| "Join Collections" >> beam.GroupByKey()
| "Remove key" >> beam.MapTuple(lambda _, val: val)
| "Merge dicts" >> beam.ParDo(mergeDicts())
| "Write to GCS" >> beam.io.WriteToText(output_file)
)
Run Code Online (Sandbox Code Playgroud)
我的问题是,当在 Apache Beam 计算引擎中执行管道时,我获得了由列表的最后一个元素(在本例中为 C)过滤的相同 pcollections。
[添加] 看起来 Apache Beam 引擎在其最终状态下采用迭代变量,这意味着迭代列表的最后一个元素,用于所有调用的分支。
我显然遵循了错误的方法,但哪种方法应该是执行此操作的最佳方法?
您遇到的是关于闭包、循环和 Python 范围的令人惊讶的问题。您可以通过分配变量而不是从闭包中拉出它来解决这个问题。例如
tables = [
(
data
# Pass it as a side input to Filter.
| "Filter by Language" >> beam.Filter(lambda elem, cls: elem["class"], c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for c in CLASSES
]
Run Code Online (Sandbox Code Playgroud)
或者
tables = [
(
data
# Explicitly capture it as a default value in the lambda.
| "Filter by Language" >> beam.Filter(lambda elem, cls=c: elem["class"])
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for c in CLASSES
]
Run Code Online (Sandbox Code Playgroud)
分区在这里也很有效,既可以避免这种陷阱,也可以表达您的意图。