Abh*_*ash -1 json hive dataframe apache-spark pyspark
我面临一个特定的问题,其中我的 JSON 结构如下:-
{A:value,
B:value,
C:Array<Struct<A1:value,B1:value, C1:Array<struct<A2:value,B2:value>>>>
}
Run Code Online (Sandbox Code Playgroud)
我希望它以以下形式爆炸:-
{
A:value,
B:value,
A1:value,
B1:value,
A2:value,
B2:value
}
Run Code Online (Sandbox Code Playgroud)
我为此使用了 pyspark 数据框,但找不到正确爆炸的方法。任何帮助表示赞赏。
让我们从一个示例数据帧开始,其架构与您指定的架构相同:
import pyspark.sql.functions as psf
df = sc.parallelize([["a", "b", "c.a1", "c.b1", "c.c1.a2", "c.c1.a3"]]).toDF(["A", "B", "A1", "B1", "A2", "B2"])\
.select("A", "B", psf.array(psf.struct("A1", "B1", psf.array(psf.struct("A2", "B2")).alias("C1"))).alias("C"))
df.printSchema()
root
|-- A: string (nullable = true)
|-- B: string (nullable = true)
|-- C: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- A1: string (nullable = true)
| | |-- B1: string (nullable = true)
| | |-- C1: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- A2: string (nullable = true)
| | | | |-- B2: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
您可以在选择列时StructType
使用分解内联元素*
(即select("C1.*")
if C1
was a StrucType
)。您的情况有点复杂,因为这些StructType
s 嵌套在ArrayType
. 尽管如此,在Spark2 中,您可以访问StructType
包含在 an 中的嵌套元素ArrayType
,输出将是ArrayType
以下元素中的an :
df.select("C.A1").show()
+------+
| A1|
+------+
|[c.a1]|
+------+
Run Code Online (Sandbox Code Playgroud)
您可以使用以下功能自动执行此过程:
获取列的嵌套列:
def get_subcols(df, col):
if col in df.columns:
subschema = [s["type"]["elementType"]["fields"] for s in df.schema.jsonValue()["fields"] if s["name"] == col][0]
return [s["name"] for s in subschema]
else:
return None
Run Code Online (Sandbox Code Playgroud)
将数据框展平为一个级别:
import re
def flatten_df(df):
non_nested_cols = [c[0] for c in df.dtypes if not re.match("array<struct|struct", c[1])]
nested_cols = [c[0] for c in df.dtypes if re.match("array<struct|struct", c[1])]
return df.select(non_nested_cols + [psf.col(c1 + "." + c2) for c1 in nested_cols for c2 in get_subcols(df, c1)])
Run Code Online (Sandbox Code Playgroud)
由于您的数据帧需要拼合两次,你将不得不使用explode
在某些时候,因为你会得到一个ArrayType
的ArrayType
的StructType
:
df1 = flatten_df(df)
df1.printSchema()
root
|-- A: string (nullable = true)
|-- B: string (nullable = true)
|-- A1: array (nullable = false)
| |-- element: string (containsNull = false)
|-- B1: array (nullable = false)
| |-- element: string (containsNull = false)
|-- C1: array (nullable = false)
| |-- element: array (containsNull = false)
| | |-- element: struct (containsNull = false)
| | | |-- A2: string (nullable = true)
| | | |-- B2: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
对于Spark1,您必须explode
每次都使用:
df.select("A", "B", psf.explode("C").alias("C"))\
.select("A", "B", "C.*")\
.select("A", "B", "A1", "B1", psf.explode("C1").alias("C1"))\
.select("A", "B", "A1", "B1", "C1.*")\
.show()
+---+---+----+----+-------+-------+
| A| B| A1| B1| A2| B2|
+---+---+----+----+-------+-------+
| a| b|c.a1|c.b1|c.c1.a2|c.c1.a3|
+---+---+----+----+-------+-------+
Run Code Online (Sandbox Code Playgroud)
请注意,explode 创建的行数与数组中的行数一样多(这里每个数组中只有一个元素)。然后你可以group
使用A, B
例如键来返回数据框。