use*_*871 6 python dataframe avro apache-spark pyspark
对于下面从 avro 文件生成的数据框,我尝试以列表或其他格式获取列名称,以便我可以在 select 语句中使用它。node1
并node2
具有相同的元素。例如,我知道我们可以这样做df.select(col('data.node1.name'))
,但我不确定
productvalues
和porders
选入单独的数据框/表中?输入架构:
root
|-- metadata: struct
|...
|-- data :struct
| |--node1 : struct
| | |--name : string
| | |--productlist: array
| | |--element : struct
| |--productvalues: array
| |--element : struct
| |-- pname:string
| |-- porders:array
| |--element : struct
| |-- ordernum: int
| |-- field: string
|--node2 : struct
| |--name : string
| |--productlist: array
| |--element : struct
|--productvalues: array
|--element : struct
|-- pname:string
|-- porders:array
|--element : struct
|-- ordernum: int
|-- field: string
Run Code Online (Sandbox Code Playgroud)
通过以下方式,您将不需要对所有结构字段进行硬编码。但是您需要提供具有struct 数组类型的列/字段的列表。您有 3 个这样的字段,我们将再添加一列,因此总共为 4 个。
首先,数据框,与您的类似:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(
('a', 'b'),
(
(
'name_1',
[
([
(
'pname_111',
[
(1111, 'field_1111'),
(1112, 'field_1112')
]
),
(
'pname_112',
[
(1121, 'field_1121'),
(1122, 'field_1122')
]
)
],),
([
(
'pname_121',
[
(1211, 'field_1211'),
(1212, 'field_1212')
]
),
(
'pname_122',
[
(1221, 'field_1221'),
(1222, 'field_1222')
]
)
],)
]
),
(
'name_2',
[
([
(
'pname_211',
[
(2111, 'field_2111'),
(2112, 'field_2112')
]
),
(
'pname_212',
[
(2121, 'field_2121'),
(2122, 'field_2122')
]
)
],),
([
(
'pname_221',
[
(2211, 'field_2211'),
(2212, 'field_2212')
]
),
(
'pname_222',
[
(2221, 'field_2221'),
(2222, 'field_2222')
]
)
],)
]
)
),
)],
'metadata:struct<fld1:string,fld2:string>, data:struct<node1:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>, node2:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>>'
)
Run Code Online (Sandbox Code Playgroud)
# df.printSchema()
# root
# |-- metadata: struct (nullable = true)
# | |-- fld1: string (nullable = true)
# | |-- fld2: string (nullable = true)
# |-- data: struct (nullable = true)
# | |-- node1: struct (nullable = true)
# | | |-- name: string (nullable = true)
# | | |-- productlist: array (nullable = true)
# | | | |-- element: struct (containsNull = true)
# | | | | |-- productvalues: array (nullable = true)
# | | | | | |-- element: struct (containsNull = true)
# | | | | | | |-- pname: string (nullable = true)
# | | | | | | |-- porders: array (nullable = true)
# | | | | | | | |-- element: struct (containsNull = true)
# | | | | | | | | |-- ordernum: integer (nullable = true)
# | | | | | | | | |-- field: string (nullable = true)
# | |-- node2: struct (nullable = true)
# | | |-- name: string (nullable = true)
# | | |-- productlist: array (nullable = true)
# | | | |-- element: struct (containsNull = true)
# | | | | |-- productvalues: array (nullable = true)
# | | | | | |-- element: struct (containsNull = true)
# | | | | | | |-- pname: string (nullable = true)
# | | | | | | |-- porders: array (nullable = true)
# | | | | | | | |-- element: struct (containsNull = true)
# | | | | | | | | |-- ordernum: integer (nullable = true)
# | | | | | | | | |-- field: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
答案
火花3.1+
nodes = df.select("data.*").columns
for n in nodes:
df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
df = df.withColumn("data", F.array("data.*"))
for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
df = df.select(
*[c for c in df.columns if c != arr_of_struct],
F.expr(f"inline({arr_of_struct})")
)
Run Code Online (Sandbox Code Playgroud)
较低的 Spark 版本:
nodes = df.select("data.*").columns
for n in nodes:
df = df.withColumn(
"data",
F.struct(
F.struct(F.lit(n).alias("node"), f"data.{n}.*").alias(n),
*[f"data.{c}" for c in df.select("data.*").columns if c != n]
)
)
df = df.withColumn("data", F.array("data.*"))
for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
df = df.select(
*[c for c in df.columns if c != arr_of_struct],
F.expr(f"inline({arr_of_struct})")
)
Run Code Online (Sandbox Code Playgroud)
结果:
df.printSchema()
# root
# |-- metadata: struct (nullable = true)
# | |-- fld1: string (nullable = true)
# | |-- fld2: string (nullable = true)
# |-- node: string (nullable = false)
# |-- name: string (nullable = true)
# |-- pname: string (nullable = true)
# |-- ordernum: integer (nullable = true)
# |-- field: string (nullable = true)
df.show()
# +--------+-----+------+---------+--------+----------+
# |metadata| node| name| pname|ordernum| field|
# +--------+-----+------+---------+--------+----------+
# | {a, b}|node1|name_1|pname_111| 1111|field_1111|
# | {a, b}|node1|name_1|pname_111| 1112|field_1112|
# | {a, b}|node1|name_1|pname_112| 1121|field_1121|
# | {a, b}|node1|name_1|pname_112| 1122|field_1122|
# | {a, b}|node1|name_1|pname_121| 1211|field_1211|
# | {a, b}|node1|name_1|pname_121| 1212|field_1212|
# | {a, b}|node1|name_1|pname_122| 1221|field_1221|
# | {a, b}|node1|name_1|pname_122| 1222|field_1222|
# | {a, b}|node2|name_2|pname_211| 2111|field_2111|
# | {a, b}|node2|name_2|pname_211| 2112|field_2112|
# | {a, b}|node2|name_2|pname_212| 2121|field_2121|
# | {a, b}|node2|name_2|pname_212| 2122|field_2122|
# | {a, b}|node2|name_2|pname_221| 2211|field_2211|
# | {a, b}|node2|name_2|pname_221| 2212|field_2212|
# | {a, b}|node2|name_2|pname_222| 2221|field_2221|
# | {a, b}|node2|name_2|pname_222| 2222|field_2222|
# +--------+-----+------+---------+--------+----------+
Run Code Online (Sandbox Code Playgroud)
解释
nodes = df.select("data.*").columns
for n in nodes:
df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
Run Code Online (Sandbox Code Playgroud)
使用上述内容,我决定保存节点标题以备不时之需。它首先从“数据”列字段获取节点列表。使用该列表,for
循环在每个节点结构内再创建一个字段作为节点的标题。
df = df.withColumn("data", F.array("data.*"))
Run Code Online (Sandbox Code Playgroud)
上面将“数据”列类型从结构转换为数组,以便在下一步中我们可以轻松地将其分解为列。
for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
df = df.select(
*[c for c in df.columns if c != arr_of_struct],
F.expr(f"inline({arr_of_struct})")
)
Run Code Online (Sandbox Code Playgroud)
上面的主线是F.expr(f"inline({arr_of_struct})")
。它必须在循环内使用,因为它是一个生成器,并且不能在 Spark 中将它们嵌套在一起。inline
将结构数组分解为列。在此步骤中,您有 4 个 [array of struct],因此inline
将创建 4 个表达式。
归档时间: |
|
查看次数: |
627 次 |
最近记录: |