Spark Dataframe - 在加入时实施 Oracle NVL 功能

RaA*_*aAm 3 scala apache-spark apache-spark-sql spark-dataframe

我需要在加入两个数据帧时在 spark 中实现 NVL 功能。

输入数据帧:

ds1.show()
---------------
|key  | Code  |
---------------
|2    | DST   |
|3    | CPT   |
|null | DTS   |
|5    | KTP   |
---------------

ds2.show()
------------------
|key  | PremAmt |
------------------
|2     | 300   |
|-1    | -99   |
|5     | 567   |
------------------
Run Code Online (Sandbox Code Playgroud)

需要实现 "LEFT JOIN NVL(DS1.key, -1) = DS2.key" 。所以我是这样写的,但是缺少 NVL 或 Coalesce 函数。所以它返回了错误的值。

如何在火花数据帧中加入“NVL”?

// nvl function is missing, so wrong output
ds1.join(ds1,Seq("key"),"left_outer")

-------------------------
|key  | Code  |PremAmt  |
-------------------------
|2    | DST   |300      |
|3    | CPT   |null     |
|null | DTS   |null     |
|5    | KTP   |567      |
-------------------------
Run Code Online (Sandbox Code Playgroud)

预期结果 :

-------------------------
|key  | Code  |PremAmt  |
-------------------------
|2    | DST   |300      |
|3    | CPT   |null     |
|null | DTS   |-99      |
|5    | KTP   |567      |
-------------------------
Run Code Online (Sandbox Code Playgroud)

小智 5

我知道一种复杂的方法。

 val df = df1.join(df2, coalesce(df1("key"), lit(-1)) === df2("key"), "left_outer")
Run Code Online (Sandbox Code Playgroud)

您应该重命名一个 df 的列名“key”,并在加入后删除该列。


小智 5

nvl 在 Scala 中的实现

import org.apache.spark.sql.Column;
import org.apache.spark.sql.functions.{when, lit};

def nvl(ColIn: Column, ReplaceVal: Any): Column = {
  return(when(ColIn.isNull, lit(ReplaceVal)).otherwise(ColIn))
}
Run Code Online (Sandbox Code Playgroud)

现在您可以使用 nvl,就像使用任何其他函数进行数据帧操作一样,例如

val NewDf = DF.withColumn("MyColNullsReplaced", nvl($"MyCol", "<null>"))
Run Code Online (Sandbox Code Playgroud)

显然,Replaceval必须是正确的类型。上面的例子假设$"MyCol"是字符串类型。