使用数据框中多个其他列的值将新列添加到Dataframe - spark/scala

Hem*_*nth 4 scala dataframe apache-spark apache-spark-sql spark-dataframe

我是新手来激发SQL和Dataframes.我有一个Dataframe我应该根据其他列的值添加新列.我有一个Nested IFexcel 的公式,我应该实现(用于向新列添加值),在转换为程序化术语时,它是这样的:

if(k =='yes')
{
  if(!(i==''))
  {
    if(diff(max_date, target_date) < 0)
    {
      if(j == '')
      {
        "pending" //the value of the column
      }
      else {
        "approved" //the value of the column
      }
    }
    else{
      "expired" //the value of the column
    }
  }
  else{
    "" //the value should be empty
  }
}
else{
  "" //the value should be empty
} 
Run Code Online (Sandbox Code Playgroud)

i,j,k are three other columns in the Dataframe.我知道我们可以使用withColumnwhen添加基于其他列的新列,但我不确定如何使用该方法实现上述逻辑.

在添加新列时实现上述逻辑的简单/有效方法是什么?任何帮助,将不胜感激.

谢谢.

soo*_*ote 6

首先,让我们简化if语句:

if(k == "yes" && i.nonEmpty)
  if(maxDate - targetDate < 0)
    if (j.isEmpty) "pending" 
    else "approved"
  else "expired"
else ""
Run Code Online (Sandbox Code Playgroud)

现在有两种主要方法可以实现这一目标

  1. 使用自定义UDF
  2. 使用内置函数火花:coalesce,when,otherwise

自定义UDF

现在由于条件的复杂性,编号2会相当棘手.使用自定义UDF应该适合您的需求.

def getState(i: String, j: String, k: String, maxDate: Long, targetDate: Long): String =  
  if(k == "yes" && i.nonEmpty)
    if(maxDate - targetDate < 0)
      if (j.isEmpty) "pending" 
      else "approved"
    else "expired"
  else ""

val stateUdf = udf(getState _)
df.withColumn("state", stateUdf($"i",$"j",$"k",lit(0),lit(0)))
Run Code Online (Sandbox Code Playgroud)

只需将lit(0)和lit(0)更改为您的日期代码,这应该适合您.

使用spark内置函数

如果您发现性能问题,您可以切换到使用coalesce,otherwisewhen,这将是这个样子:

val isApproved = df.withColumn("state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) < 0) && $"j" =!= "", "approved").otherwise(null))
val isPending = isApproved.withColumn("state", coalesce($"state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) < 0) && $"j" === "", "pending").otherwise(null)))
val isExpired = isPending.withColumn("state", coalesce($"state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) >= 0), "expired").otherwise(null)))
val finalDf = isExpired.withColumn("state", coalesce($"state", lit("")))
Run Code Online (Sandbox Code Playgroud)

我过去使用自定义udf来使用大型输入源而没有问题,而自定义udf可以使代码更易读,特别是在这种情况下.