vat*_*ada 2 scala dataframe apache-spark apache-spark-sql
我有火花数据框,mainDF并且deltaDF都具有匹配的模式。
mainDF的内容如下:
id | name | age
1 | abc | 23
2 | xyz | 34
3 | pqr | 45
Run Code Online (Sandbox Code Playgroud)
内容deltaDF如下:
id | name | age
1 | lmn | 56
4 | efg | 37
Run Code Online (Sandbox Code Playgroud)
我想合并deltaDF与mainDF基于的价值id。因此,如果我id已经存在,mainDF则应更新记录,如果id不存在,则应添加新记录。所以生成的数据框应该是这样的:
id | name | age
1 | lmn | 56
2 | xyz | 34
3 | pqr | 45
4 | efg | 37
Run Code Online (Sandbox Code Playgroud)
这是我当前的代码,它正在工作:
val updatedDF = mainDF.as("main").join(deltaDF.as("delta"),$"main.id" === $"delta.id","inner").select($"main.id",$"main.name",$"main.age")
mainDF= mainDF.except(updateDF).unionAll(deltaDF)
Run Code Online (Sandbox Code Playgroud)
但是,在这里我需要在 select 函数中再次明确提供列表列,这对我来说是开销。有没有其他更好/更清洁的方法来实现相同的目标?
如果您不想显式提供列列表,则可以映射原始 DF 的列,例如:
.select(mainDF.columns.map(c => $"main.$c" as c): _*)
Run Code Online (Sandbox Code Playgroud)
顺便说一句,您可以在没有union之后执行此操作join:您可以使用outerjoin 获取两个 DF 中都不存在的记录,然后使用coalesce“选择”非空值偏好deltaDF的值。所以完整的解决方案将类似于:
val updatedDF = mainDF.as("main")
.join(deltaDF.as("delta"), $"main.id" === $"delta.id", "outer")
.select(mainDF.columns.map(c => coalesce($"delta.$c", $"main.$c") as c): _*)
updatedDF.show
// +---+----+---+
// | id|name|age|
// +---+----+---+
// | 1| lmn| 56|
// | 3| pqr| 45|
// | 4| efg| 37|
// | 2| xyz| 34|
// +---+----+---+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2329 次 |
| 最近记录: |