如何对 spark Dataframe 执行合并操作?

vat*_*ada 2 scala dataframe apache-spark apache-spark-sql

我有火花数据框,mainDF并且deltaDF都具有匹配的模式。

mainDF的内容如下:

id | name | age
1  | abc  | 23
2  | xyz  | 34
3  | pqr  | 45
Run Code Online (Sandbox Code Playgroud)

内容deltaDF如下:

id | name | age
1  | lmn  | 56
4  | efg  | 37
Run Code Online (Sandbox Code Playgroud)

我想合并deltaDFmainDF基于的价值id。因此,如果我id已经存在,mainDF则应更新记录,如果id不存在,则应添加新记录。所以生成的数据框应该是这样的:

id | name | age
1  | lmn  | 56
2  | xyz  | 34
3  | pqr  | 45
4  | efg  | 37
Run Code Online (Sandbox Code Playgroud)

这是我当前的代码,它正在工作:

  val updatedDF = mainDF.as("main").join(deltaDF.as("delta"),$"main.id" === $"delta.id","inner").select($"main.id",$"main.name",$"main.age")
 mainDF= mainDF.except(updateDF).unionAll(deltaDF)
Run Code Online (Sandbox Code Playgroud)

但是,在这里我需要在 select 函数中再次明确提供列表列,这对我来说是开销。有没有其他更好/更清洁的方法来实现相同的目标?

Tza*_*har 6

如果您不想显式提供列列表,则可以映射原始 DF 的列,例如:

.select(mainDF.columns.map(c => $"main.$c" as c): _*)
Run Code Online (Sandbox Code Playgroud)

顺便说一句,您可以在没有union之后执行此操作join:您可以使用outerjoin 获取两个 DF 中都不存在的记录,然后使用coalesce“选择”非空值偏好deltaDF的值。所以完整的解决方案将类似于:

val updatedDF = mainDF.as("main")
  .join(deltaDF.as("delta"), $"main.id" === $"delta.id", "outer")
  .select(mainDF.columns.map(c => coalesce($"delta.$c", $"main.$c") as c): _*)

updatedDF.show
// +---+----+---+
// | id|name|age|
// +---+----+---+
// |  1| lmn| 56|
// |  3| pqr| 45|
// |  4| efg| 37|
// |  2| xyz| 34|
// +---+----+---+
Run Code Online (Sandbox Code Playgroud)