Spark sql如何在不丢失空值的情况下爆炸

ale*_*lov 30 java null apache-spark apache-spark-sql

我有一个Dataframe,我试图压扁.作为整个过程的一部分,我想爆炸它,所以如果我有一列数组,那么数组的每个值都将用于创建一个单独的行.例如,

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
Run Code Online (Sandbox Code Playgroud)

应该成为

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
Run Code Online (Sandbox Code Playgroud)

这是我的代码

private DataFrame explodeDataFrame(DataFrame df) {
    DataFrame resultDf = df;
    for (StructField field : df.schema().fields()) {
        if (field.dataType() instanceof ArrayType) {
            resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
            resultDf.show();
        }
    }
    return resultDf;
}
Run Code Online (Sandbox Code Playgroud)

问题是在我的数据中,一些数组列有空值.在这种情况下,整个行都将被删除.所以这个数据帧:

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
2  | Lucy | null
Run Code Online (Sandbox Code Playgroud)

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
Run Code Online (Sandbox Code Playgroud)

代替

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
2  | Lucy | null
Run Code Online (Sandbox Code Playgroud)

如何爆炸我的数组,以便我不会丢失空行?

我使用的是Spark 1.5.2和Java 8

zer*_*323 60

Spark 2.2+

你可以使用explode_outer功能:

import org.apache.spark.sql.functions.explode_outer

df.withColumn("likes", explode_outer($"likes")).show

// +---+----+--------+
// | id|name|   likes|
// +---+----+--------+
// |  1|Luke|baseball|
// |  1|Luke|  soccer|
// |  2|Lucy|    null|
// +---+----+--------+
Run Code Online (Sandbox Code Playgroud)

Spark <= 2.1

在Scala中,Java等效应该几乎相同(导入单个函数使用import static).

import org.apache.spark.sql.functions.{array, col, explode, lit, when}

val df = Seq(
  (1, "Luke", Some(Array("baseball", "soccer"))),
  (2, "Lucy", None)
).toDF("id", "name", "likes")

df.withColumn("likes", explode(
  when(col("likes").isNotNull, col("likes"))
    // If null explode an array<string> with a single null
    .otherwise(array(lit(null).cast("string")))))
Run Code Online (Sandbox Code Playgroud)

这里的想法是基本取代NULLarray(NULL)希望的类型.对于复杂类型(aka structs),您必须提供完整的模式:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")

val st =  StructType(Seq(
  StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast(st)))))
Run Code Online (Sandbox Code Playgroud)

要么

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
Run Code Online (Sandbox Code Playgroud)

注意:

如果Column已使用containsNullset 创建数组,false则应首先更改此数组(使用Spark 2.1测试):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
Run Code Online (Sandbox Code Playgroud)

  • @hamed 对于小的语法差异,它实际上是相同的纠正(例如“isNotNull()”而不是“isNotNull”)。 (2认同)

小智 7

您可以使用explode_outer()函数。