使用循环分支 Apache Beam 管道

Ton*_*nca 3 python google-cloud-dataflow apache-beam

我正在尝试执行去规范化操作,我需要使用以下逻辑重新组织表:

| itemid | class | value |
+--------+-------+-------+
| 1      | A     | 0.2   |       | itemid | value A | value B | value C |
| 1      | B     | 10.3  |  ==>  +--------+---------+---------+---------+
| 2      | A     | 3.0   |  ==>  | 1      |   0.2   |  10.3   |         |
| 2      | B     | 0.2   |  ==>  | 2      |   3.0   |   0.2   |         |
| 3      | A     | 0.0   |       | 3      |   0.0   |   1.2   |   5.4   | 
| 3      | B     | 1.2   |  
| 3      | C     | 5.4   |      
Run Code Online (Sandbox Code Playgroud)

我的方法是执行 for 循环以通过 过滤class,因为我知道类的先验列表,然后加入结果 pcollections。

高级代码:

| itemid | class | value |
+--------+-------+-------+
| 1      | A     | 0.2   |       | itemid | value A | value B | value C |
| 1      | B     | 10.3  |  ==>  +--------+---------+---------+---------+
| 2      | A     | 3.0   |  ==>  | 1      |   0.2   |  10.3   |         |
| 2      | B     | 0.2   |  ==>  | 2      |   3.0   |   0.2   |         |
| 3      | A     | 0.0   |       | 3      |   0.0   |   1.2   |   5.4   | 
| 3      | B     | 1.2   |  
| 3      | C     | 5.4   |      
Run Code Online (Sandbox Code Playgroud)

和加入:

CLASSES = ["A", "B", "C"]

tables = [  
    (
        data
        | "Filter by Language" >> beam.Filter(lambda elem: elem["class"]==c)
        | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
    )
    for cin CLASSES
]
Run Code Online (Sandbox Code Playgroud)

与(根据 Peter Kim 的建议进行编辑):

_ = ( 
    tables
    | "Flatten" >> beam.Flatten()
    | "Join Collections" >> beam.GroupByKey()
    | "Remove key" >> beam.MapTuple(lambda _, val: val)
    | "Merge dicts" >> beam.ParDo(mergeDicts())
    | "Write to GCS" >> beam.io.WriteToText(output_file)
)
Run Code Online (Sandbox Code Playgroud)

我的问题是,当在 Apache Beam 计算引擎中执行管道时,我获得了由列表的最后一个元素(在本例中为 C)过滤的相同 pcollections。

[添加] 看起来 Apache Beam 引擎在其最终状态下采用迭代变量,这意味着迭代列表的最后一个元素,用于所有调用的分支。

我显然遵循了错误的方法,但哪种方法应该是执行此操作的最佳方法?

rob*_*twb 5

您遇到的是关于闭包、循环和 Python 范围的令人惊讶的问题。您可以通过分配变量而不是从闭包中拉出它来解决这个问题。例如

tables = [  
    (
        data
        # Pass it as a side input to Filter.
        | "Filter by Language" >> beam.Filter(lambda elem, cls: elem["class"], c)
        | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
    )
    for c in CLASSES
]
Run Code Online (Sandbox Code Playgroud)

或者

tables = [  
    (
        data
        # Explicitly capture it as a default value in the lambda.
        | "Filter by Language" >> beam.Filter(lambda elem, cls=c: elem["class"])
        | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
    )
    for c in CLASSES
]
Run Code Online (Sandbox Code Playgroud)

分区在这里也很有效,既可以避免这种陷阱,也可以表达您的意图。