合并 Spark Scala Dataframe 中的行

Dar*_*hah 5 scala dataframe apache-spark

合并火花数据框中的行

我有如下数据

ID  Name    Passport    Country  License    UpdatedtimeStamp
1   Ostrich 12345       -       ABC         11-02-2018
1   -       -           -       BCD         10-02-2018
1   Shah    12345       -       -           12-02-2018
2   PJ      -           ANB     a           10-02-2018
Run Code Online (Sandbox Code Playgroud)

所需的输出是

ID  Name    Passport    Country  License    UpdatedtimeStamp
1   Shah    12345       -       ABC         12-02-2018
2   PJ      -           ANB     a           10-02-2018
Run Code Online (Sandbox Code Playgroud)

基本上,相同的数据ID应该合并,并且最新更新而不是null记录应该在输出中,如果所有值都是null,那么null应该保留..

请建议...另外,建议不要使用 SparkSQLWindow函数,因为我需要它非常快

mik*_*keL 2

如果你想完全留在sparkSQL中

val df= Seq((1,Some("ostrich"), Some(12345), None, Some("ABC")," 11-02-2018" ),
(1,None, None, None, Some("BCD"), "10-02-2018"),(1,Some("Shah"), Some(12345), None,None, "12-02-2018"),
(2,Some("PJ"), None, Some("ANB"), Some("a"), "10-02-2018")).toDF("ID","Name","Passport","Country","License","UpdatedtimeStamp")


val df1= df.withColumn("date", to_date($"UpdatedtimeStamp","MM-dd-yyyy" )).drop($"UpdatedtimeStamp")

val win = Window.partitionBy("ID").orderBy($"date".desc)

val df2=df1.select($"*", row_number.over(win).as("r")).orderBy($"ID", $"r").drop("r")
val exprs= df2.columns.drop(1).map(x=>collect_list(x).as(x+"_grp"))

val df3=df2.groupBy("ID").agg(exprs.head,exprs.tail: _*)

val exprs2= df3.columns.drop(1).map(x=> col(x)(0).as(x))

df3.select((Array(col(df2.columns(0)))++exprs2): _*).show


+---+----+--------+-------+-------+----------+
| ID|Name|Passport|Country|License|      date|
+---+----+--------+-------+-------+----------+
|  1|Shah|   12345|   null|    ABC|2018-12-02|
|  2|  PJ|    null|    ANB|      a|2018-10-02|
+---+----+--------+-------+-------+----------+
Run Code Online (Sandbox Code Playgroud)