rvi*_*lla 7 arrays struct dataframe apache-spark apache-spark-sql
有这个架构:
root
|-- Elems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Elem: integer (nullable = true)
| | |-- Desc: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我们如何添加这样的新字段?
root
|-- Elems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- New_field: integer (nullable = true)
| | |-- Elem: integer (nullable = true)
| | |-- Desc: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我已经用一个简单的结构做到了这一点(更多细节在这篇文章的底部),但我无法用结构数组来做到这一点。
这是测试它的代码:
val schema = new StructType()
.add("Elems", ArrayType(new StructType()
.add("Elem", IntegerType)
.add("Desc", StringType)
))
val dataDS = Seq("""
{
"Elems": [ {"Elem":1, "Desc": "d1"}, {"Elem":2, "Desc": "d2"}, {"Elem":3, "Desc": "d3"} ]
}
""").toDS()
val df = spark.read.schema(schema).json(dataDS.rdd)
df.show(false)
+---------------------------+
|Elems |
+---------------------------+
|[[1, d1], [2, d2], [3, d3]]|
+---------------------------+
Run Code Online (Sandbox Code Playgroud)
一旦我们有了 DF,最好的方法是为每个元素创建一个数组结构:
val mod_df = df.withColumn("modif_elems",
struct(
array(lit("")).as("New_field"),
col("Elems.Elem"),
col("Elems.Desc")
))
mod_df.show(false)
+---------------------------+-----------------------------+
|Elems |modif_elems |
+---------------------------+-----------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[], [1, 2, 3], [d1, d2, d3]]|
+---------------------------+-----------------------------+
mod_df.printSchema
root
|-- Elems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Elem: integer (nullable = true)
| | |-- Desc: string (nullable = true)
|-- modif_elems: struct (nullable = false)
| |-- New_field: array (nullable = false)
| | |-- element: string (containsNull = false)
| |-- Elem: array (nullable = true)
| | |-- element: integer (containsNull = true)
| |-- Desc: array (nullable = true)
| | |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
我们不会丢失任何数据,但这并不是我想要的。
更新:PD1 中的解决方法。
代码几乎相同,但现在我们没有结构体数组,因此修改结构体更容易:
val schema = new StructType()
.add("Elems", new StructType()
.add("Elem", IntegerType)
.add("Desc", StringType)
)
val dataDS = Seq("""
{
"Elems": {"Elem":1, "Desc": "d1"}
}
""").toDS()
val df = spark.read.schema(schema).json(dataDS.rdd)
df.show(false)
+-------+
|Elems |
+-------+
|[1, d1]|
+-------+
df.printSchema
root
|-- Elems: struct (nullable = true)
| |-- Elem: integer (nullable = true)
| |-- Desc: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
在这种情况下,为了添加字段,我们需要创建另一个结构体:
val mod_df = df
.withColumn("modif_elems",
struct(
lit("").alias("New_field"),
col("Elems.Elem"),
col("Elems.Desc")
)
)
mod_df.show
+-------+-----------+
| Elems|modif_elems|
+-------+-----------+
|[1, d1]| [, 1, d1]|
+-------+-----------+
mod_df.printSchema
root
|-- Elems: struct (nullable = true)
| |-- Elem: integer (nullable = true)
| |-- Desc: string (nullable = true)
|-- modif_elems: struct (nullable = false)
| |-- New_field: string (nullable = false)
| |-- Elem: integer (nullable = true)
| |-- Desc: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
好的,我已经使用了arrays_zip Spark SQL 函数(2.4.0 版本中的新函数),它几乎是我想要的,但我不知道如何更改元素名称(as或alias在这里不起作用):
val mod_df = df.withColumn("modif_elems",
arrays_zip(
array(lit("")).as("New_field"),
col("Elems.Elem").as("Elem"),
col("Elems.Desc").alias("Desc")
)
)
mod_df.show(false)
+---------------------------+---------------------------------+
|Elems |modif_elems |
+---------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+
mod_df.printSchema
root
|-- Elems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Elem: integer (nullable = true)
| | |-- Desc: string (nullable = true)
|-- modif_elems: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- 0: string (nullable = true)
| | |-- 1: integer (nullable = true)
| | |-- 2: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
struct modif_elems应包含 3 个名为New_field、Elem和Desc 的元素,而不是0、1和2。
火花3.1+
斯卡拉
输入:
val df = spark.createDataFrame(Seq((1, "2")))
.select(
array(struct(
col("_1").as("Elem"),
col("_2").as("Desc")
)).as("Elems")
)
df.printSchema()
// root
// |-- Elems: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- Elem: integer (nullable = true)
// | | |-- Desc: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
脚本
val df2 = df.withColumn(
"Elems",
transform(
$"Elems",
x => x.withField("New_field", lit(3))
)
)
df2.printSchema()
// root
// |-- Elems: array (nullable = false)
// | |-- element: struct (containsNull = false)
// | | |-- Elem: long (nullable = true)
// | | |-- Desc: string (nullable = true)
// | | |-- New_field: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)
pySpark
输入:
from pyspark.sql import functions as F
df = spark.createDataFrame([(1, "2",)]) \
.select(
F.array(F.struct(
F.col("_1").alias("Elem"),
F.col("_2").alias("Desc")
)).alias("Elems")
)
df.printSchema()
# root
# |-- Elems: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- Elem: integer (nullable = true)
# | | |-- Desc: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
脚本:
df = df.withColumn(
"Elems",
F.transform(
F.col("Elems"),
lambda x: x.withField("New_field", F.lit(3))
)
)
df.printSchema()
# root
# |-- Elems: array (nullable = false)
# | |-- element: struct (containsNull = false)
# | | |-- Elem: long (nullable = true)
# | | |-- Desc: string (nullable = true)
# | | |-- New_field: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8348 次 |
| 最近记录: |