如何在 PySpark 中分解嵌套数据框并将其进一步存储到 hive

Abh*_*ash -1 json hive dataframe apache-spark pyspark

我面临一个特定的问题,其中我的 JSON 结构如下:-

{A:value,
B:value,
C:Array<Struct<A1:value,B1:value, C1:Array<struct<A2:value,B2:value>>>>
}
Run Code Online (Sandbox Code Playgroud)

我希望它以以下形式爆炸:-

{
A:value,
B:value,
A1:value,
B1:value,
A2:value,
B2:value
}
Run Code Online (Sandbox Code Playgroud)

我为此使用了 pyspark 数据框,但找不到正确爆炸的方法。任何帮助表示赞赏。

MaF*_*aFF 6

让我们从一个示例数据帧开始,其架构与您指定的架构相同:

import pyspark.sql.functions as psf
df = sc.parallelize([["a", "b", "c.a1", "c.b1", "c.c1.a2", "c.c1.a3"]]).toDF(["A", "B", "A1", "B1", "A2", "B2"])\
    .select("A", "B", psf.array(psf.struct("A1", "B1", psf.array(psf.struct("A2", "B2")).alias("C1"))).alias("C"))
df.printSchema()

    root
     |-- A: string (nullable = true)
     |-- B: string (nullable = true)
     |-- C: array (nullable = false)
     |    |-- element: struct (containsNull = false)
     |    |    |-- A1: string (nullable = true)
     |    |    |-- B1: string (nullable = true)
     |    |    |-- C1: array (nullable = false)
     |    |    |    |-- element: struct (containsNull = false)
     |    |    |    |    |-- A2: string (nullable = true)
     |    |    |    |    |-- B2: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

您可以在选择列时StructType使用分解内联元素*(即select("C1.*")if C1was a StrucType)。您的情况有点复杂,因为这些StructTypes 嵌套在ArrayType. 尽管如此,在Spark2 中,您可以访问StructType包含在 an 中的嵌套元素ArrayType,输出将是ArrayType以下元素中的an :

df.select("C.A1").show()

    +------+
    |    A1|
    +------+
    |[c.a1]|
    +------+
Run Code Online (Sandbox Code Playgroud)

您可以使用以下功能自动执行此过程:

获取列的嵌套列:

def get_subcols(df, col):
    if col in df.columns:
        subschema = [s["type"]["elementType"]["fields"] for s in df.schema.jsonValue()["fields"] if s["name"] == col][0]
        return  [s["name"] for s in subschema]
    else:
        return None
Run Code Online (Sandbox Code Playgroud)

将数据框展平为一个级别:

import re 
def flatten_df(df):
    non_nested_cols = [c[0] for c in df.dtypes if not re.match("array<struct|struct", c[1])]
    nested_cols = [c[0] for c in df.dtypes if re.match("array<struct|struct", c[1])]
    return df.select(non_nested_cols + [psf.col(c1 + "." + c2) for c1 in nested_cols for c2 in get_subcols(df, c1)])
Run Code Online (Sandbox Code Playgroud)

由于您的数据帧需要拼合两次,你将不得不使用explode在某些时候,因为你会得到一个ArrayTypeArrayTypeStructType

df1 = flatten_df(df)
df1.printSchema()

    root
     |-- A: string (nullable = true)
     |-- B: string (nullable = true)
     |-- A1: array (nullable = false)
     |    |-- element: string (containsNull = false)
     |-- B1: array (nullable = false)
     |    |-- element: string (containsNull = false)
     |-- C1: array (nullable = false)
     |    |-- element: array (containsNull = false)
     |    |    |-- element: struct (containsNull = false)
     |    |    |    |-- A2: string (nullable = true)
     |    |    |    |-- B2: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

对于Spark1,您必须explode每次都使用:

df.select("A", "B", psf.explode("C").alias("C"))\
    .select("A", "B", "C.*")\
    .select("A", "B", "A1", "B1", psf.explode("C1").alias("C1"))\
    .select("A", "B", "A1", "B1", "C1.*")\
    .show()

    +---+---+----+----+-------+-------+
    |  A|  B|  A1|  B1|     A2|     B2|
    +---+---+----+----+-------+-------+
    |  a|  b|c.a1|c.b1|c.c1.a2|c.c1.a3|
    +---+---+----+----+-------+-------+
Run Code Online (Sandbox Code Playgroud)

请注意,explode 创建的行数与数组中的行数一样多(这里每个数组中只有一个元素)。然后你可以group使用A, B例如键来返回数据框。