从高度嵌套的数据中选择列

use*_*871 6 python dataframe avro apache-spark pyspark

对于下面从 avro 文件生成的数据框,我尝试以列表或其他格式获取列名称,以便我可以在 select 语句中使用它。node1node2具有相同的元素。例如,我知道我们可以这样做df.select(col('data.node1.name')),但我不确定

  1. 如何一次选择所有列而不对所有列名称进行硬编码,以及
  2. 如何处理嵌套部分。我认为为了使其可读,应该将productvaluesporders选入单独的数据框/表中?

输入架构:

root
  |-- metadata: struct
  |...
  |-- data :struct 
  |    |--node1 : struct
  |    |   |--name : string
  |    |   |--productlist: array
  |    |        |--element : struct
       |              |--productvalues: array
       |                   |--element : struct
       |                         |-- pname:string
       |                         |-- porders:array
       |                                |--element : struct
       |                                      |-- ordernum: int
       |                                      |-- field: string
       |--node2 : struct
  |        |--name : string
  |        |--productlist: array
  |             |--element : struct
                      |--productvalues: array
                          |--element : struct
                                 |-- pname:string
                                 |-- porders:array
                                        |--element : struct
                                              |-- ordernum: int
                                              |-- field: string
Run Code Online (Sandbox Code Playgroud)

Zyg*_*ygD 1

通过以下方式,您将不需要对所有结构字段进行硬编码。但是您需要提供具有struct 数组类型的列/字段的列表。您有 3 个这样的字段,我们将再添加一列,因此总共为 4 个。

首先,数据框,与您的类似:

from pyspark.sql import functions as F

df = spark.createDataFrame(
    [(
        ('a', 'b'),
        (
            (
                'name_1',
                [
                    ([
                        (
                            'pname_111',
                            [
                                (1111, 'field_1111'),
                                (1112, 'field_1112')
                            ]
                        ),
                        (
                            'pname_112',
                            [
                                (1121, 'field_1121'),
                                (1122, 'field_1122')
                            ]
                        )
                    ],),
                    ([
                        (
                            'pname_121',
                            [
                                (1211, 'field_1211'),
                                (1212, 'field_1212')
                            ]
                        ),
                        (
                            'pname_122',
                            [
                                (1221, 'field_1221'),
                                (1222, 'field_1222')
                            ]
                        )
                    ],)
                ]
            ),
            (
                'name_2',
                [
                    ([
                        (
                            'pname_211',
                            [
                                (2111, 'field_2111'),
                                (2112, 'field_2112')
                            ]
                        ),
                        (
                            'pname_212',
                            [
                                (2121, 'field_2121'),
                                (2122, 'field_2122')
                            ]
                        )
                    ],),
                    ([
                        (
                            'pname_221',
                            [
                                (2211, 'field_2211'),
                                (2212, 'field_2212')
                            ]
                        ),
                        (
                            'pname_222',
                            [
                                (2221, 'field_2221'),
                                (2222, 'field_2222')
                            ]
                        )
                    ],)
                ]
            )
        ),
    )],
    'metadata:struct<fld1:string,fld2:string>, data:struct<node1:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>, node2:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>>'
)
Run Code Online (Sandbox Code Playgroud)
# df.printSchema()
# root
#  |-- metadata: struct (nullable = true)
#  |    |-- fld1: string (nullable = true)
#  |    |-- fld2: string (nullable = true)
#  |-- data: struct (nullable = true)
#  |    |-- node1: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- productlist: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- productvalues: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- pname: string (nullable = true)
#  |    |    |    |    |    |    |-- porders: array (nullable = true)
#  |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |    |    |-- ordernum: integer (nullable = true)
#  |    |    |    |    |    |    |    |    |-- field: string (nullable = true)
#  |    |-- node2: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- productlist: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- productvalues: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- pname: string (nullable = true)
#  |    |    |    |    |    |    |-- porders: array (nullable = true)
#  |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |    |    |-- ordernum: integer (nullable = true)
#  |    |    |    |    |    |    |    |    |-- field: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

答案

  • 火花3.1+

    nodes = df.select("data.*").columns
    for n in nodes:
        df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
    df = df.withColumn("data", F.array("data.*"))
    
    for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
        df = df.select(
            *[c for c in df.columns if c != arr_of_struct],
            F.expr(f"inline({arr_of_struct})")
        )
    
    Run Code Online (Sandbox Code Playgroud)
  • 较低的 Spark 版本:

    nodes = df.select("data.*").columns
    for n in nodes:
        df = df.withColumn(
            "data",
            F.struct(
                F.struct(F.lit(n).alias("node"), f"data.{n}.*").alias(n),
                *[f"data.{c}" for c in df.select("data.*").columns if c != n]
            )
        )
    df = df.withColumn("data", F.array("data.*"))
    
    for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
        df = df.select(
            *[c for c in df.columns if c != arr_of_struct],
            F.expr(f"inline({arr_of_struct})")
        )
    
    Run Code Online (Sandbox Code Playgroud)

结果:

df.printSchema()
# root
#  |-- metadata: struct (nullable = true)
#  |    |-- fld1: string (nullable = true)
#  |    |-- fld2: string (nullable = true)
#  |-- node: string (nullable = false)
#  |-- name: string (nullable = true)
#  |-- pname: string (nullable = true)
#  |-- ordernum: integer (nullable = true)
#  |-- field: string (nullable = true)

df.show()
# +--------+-----+------+---------+--------+----------+
# |metadata| node|  name|    pname|ordernum|     field|
# +--------+-----+------+---------+--------+----------+
# |  {a, b}|node1|name_1|pname_111|    1111|field_1111|
# |  {a, b}|node1|name_1|pname_111|    1112|field_1112|
# |  {a, b}|node1|name_1|pname_112|    1121|field_1121|
# |  {a, b}|node1|name_1|pname_112|    1122|field_1122|
# |  {a, b}|node1|name_1|pname_121|    1211|field_1211|
# |  {a, b}|node1|name_1|pname_121|    1212|field_1212|
# |  {a, b}|node1|name_1|pname_122|    1221|field_1221|
# |  {a, b}|node1|name_1|pname_122|    1222|field_1222|
# |  {a, b}|node2|name_2|pname_211|    2111|field_2111|
# |  {a, b}|node2|name_2|pname_211|    2112|field_2112|
# |  {a, b}|node2|name_2|pname_212|    2121|field_2121|
# |  {a, b}|node2|name_2|pname_212|    2122|field_2122|
# |  {a, b}|node2|name_2|pname_221|    2211|field_2211|
# |  {a, b}|node2|name_2|pname_221|    2212|field_2212|
# |  {a, b}|node2|name_2|pname_222|    2221|field_2221|
# |  {a, b}|node2|name_2|pname_222|    2222|field_2222|
# +--------+-----+------+---------+--------+----------+
Run Code Online (Sandbox Code Playgroud)

解释

nodes = df.select("data.*").columns
for n in nodes:
    df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
Run Code Online (Sandbox Code Playgroud)

使用上述内容,我决定保存节点标题以备不时之需。它首先从“数据”列字段获取节点列表。使用该列表,for循环在每个节点结构内再创建一个字段作为节点的标题。

df = df.withColumn("data", F.array("data.*"))
Run Code Online (Sandbox Code Playgroud)

上面将“数据”列类型从结构转换为数组,以便在下一步中我们可以轻松地将其分解为列。

for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
    df = df.select(
        *[c for c in df.columns if c != arr_of_struct],
        F.expr(f"inline({arr_of_struct})")
    )
Run Code Online (Sandbox Code Playgroud)

上面的主线是F.expr(f"inline({arr_of_struct})")。它必须在循环内使用,因为它是一个生成器,并且不能在 Spark 中将它们嵌套在一起。inline结构数组分解为列。在此步骤中,您有 4 个 [array of struct],因此inline将创建 4 个表达式。