Spark - 如何将元素添加到结构数组中

rvi*_*lla 7 arrays struct dataframe apache-spark apache-spark-sql

有这个架构:

root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我们如何添加这样的新字段?

root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- New_field: integer (nullable = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我已经用一个简单的结构做到了这一点(更多细节在这篇文章的底部),但我无法用结构数组来做到这一点。

这是测试它的代码:

val schema = new StructType()
    .add("Elems", ArrayType(new StructType()
        .add("Elem", IntegerType)
        .add("Desc", StringType)
    ))

val dataDS = Seq("""
{
  "Elems": [ {"Elem":1, "Desc": "d1"}, {"Elem":2, "Desc": "d2"}, {"Elem":3, "Desc": "d3"} ]
}
""").toDS()

val df = spark.read.schema(schema).json(dataDS.rdd)

df.show(false)
+---------------------------+
|Elems                      |
+---------------------------+
|[[1, d1], [2, d2], [3, d3]]|
+---------------------------+
Run Code Online (Sandbox Code Playgroud)

一旦我们有了 DF,最好的方法是为每个元素创建一个数组结构:

val mod_df = df.withColumn("modif_elems", 
     struct(
         array(lit("")).as("New_field"),
         col("Elems.Elem"),
         col("Elems.Desc")
                            ))

mod_df.show(false)
+---------------------------+-----------------------------+
|Elems                      |modif_elems                  |
+---------------------------+-----------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[], [1, 2, 3], [d1, d2, d3]]|
+---------------------------+-----------------------------+


mod_df.printSchema
root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems: struct (nullable = false)
 |    |-- New_field: array (nullable = false)
 |    |    |-- element: string (containsNull = false)
 |    |-- Elem: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |    |-- Desc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)

我们不会丢失任何数据,但这并不是我想要的。

更新:PD1 中的解决方法。


奖励轨道:修改结构体(不在数组中)

代码几乎相同,但现在我们没有结构体数组,因此修改结构体更容易:

val schema = new StructType()
    .add("Elems", new StructType()
        .add("Elem", IntegerType)
        .add("Desc", StringType)
    )


val dataDS = Seq("""
{
  "Elems": {"Elem":1, "Desc": "d1"}
}
""").toDS()    


val df = spark.read.schema(schema).json(dataDS.rdd)
df.show(false)
+-------+
|Elems  |
+-------+
|[1, d1]|
+-------+

df.printSchema
root
 |-- Elems: struct (nullable = true)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

在这种情况下,为了添加字段,我们需要创建另一个结构体:

val mod_df = df
    .withColumn("modif_elems", 
                struct(
                    lit("").alias("New_field"),
                    col("Elems.Elem"),
                    col("Elems.Desc")
                    )
               )

mod_df.show
+-------+-----------+
|  Elems|modif_elems|
+-------+-----------+
|[1, d1]|  [, 1, d1]|
+-------+-----------+

mod_df.printSchema
root
 |-- Elems: struct (nullable = true)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)
 |-- modif_elems: struct (nullable = false)
 |    |-- New_field: string (nullable = false)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)

Run Code Online (Sandbox Code Playgroud)

PD1:

好的,我已经使用了arrays_zip Spark SQL 函数(2.4.0 版本中的新函数),它几乎是我想要的,但我不知道如何更改元素名称(asalias在这里不起作用):

val mod_df = df.withColumn("modif_elems", 
        arrays_zip(
            array(lit("")).as("New_field"),
            col("Elems.Elem").as("Elem"),
            col("Elems.Desc").alias("Desc")
                    )
        )

mod_df.show(false)
+---------------------------+---------------------------------+
|Elems                      |modif_elems                      |
+---------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+

mod_df.printSchema
root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- 0: string (nullable = true)
 |    |    |-- 1: integer (nullable = true)
 |    |    |-- 2: string (nullable = true)

Run Code Online (Sandbox Code Playgroud)

struct modif_elems应包含 3 个名为New_fieldElemDesc 的元素,而不是012

Zyg*_*ygD 9

火花3.1+

withField可以使用(与 一起transform

  • 斯卡拉

    输入:

    val df = spark.createDataFrame(Seq((1, "2")))
        .select(
            array(struct(
                col("_1").as("Elem"),
                col("_2").as("Desc")
            )).as("Elems")
        )
    df.printSchema()
    // root
    //  |-- Elems: array (nullable = true)
    //  |    |-- element: struct (containsNull = true)
    //  |    |    |-- Elem: integer (nullable = true)
    //  |    |    |-- Desc: string (nullable = true)
    
    Run Code Online (Sandbox Code Playgroud)

    脚本

    val df2 = df.withColumn(
        "Elems",
        transform(
            $"Elems",
            x => x.withField("New_field", lit(3))
        )
    )
    df2.printSchema()
    // root
    //  |-- Elems: array (nullable = false)
    //  |    |-- element: struct (containsNull = false)
    //  |    |    |-- Elem: long (nullable = true)
    //  |    |    |-- Desc: string (nullable = true)
    //  |    |    |-- New_field: integer (nullable = false)
    
    Run Code Online (Sandbox Code Playgroud)
  • pySpark

    输入:

    from pyspark.sql import functions as F
    df = spark.createDataFrame([(1, "2",)]) \
        .select(
            F.array(F.struct(
                F.col("_1").alias("Elem"),
                F.col("_2").alias("Desc")
            )).alias("Elems")
        )
    df.printSchema()
    # root
    #  |-- Elems: array (nullable = true)
    #  |    |-- element: struct (containsNull = true)
    #  |    |    |-- Elem: integer (nullable = true)
    #  |    |    |-- Desc: string (nullable = true)
    
    Run Code Online (Sandbox Code Playgroud)

    脚本:

    df = df.withColumn(
        "Elems",
        F.transform(
            F.col("Elems"),
            lambda x: x.withField("New_field", F.lit(3))
        )
    )
    df.printSchema()
    # root
    #  |-- Elems: array (nullable = false)
    #  |    |-- element: struct (containsNull = false)
    #  |    |    |-- Elem: long (nullable = true)
    #  |    |    |-- Desc: string (nullable = true)
    #  |    |    |-- New_field: integer (nullable = false)
    
    Run Code Online (Sandbox Code Playgroud)