带有非列参数的 Spark udf

Geo*_*ler 2 scala user-defined-functions apache-spark apache-spark-sql udf

我想将一个变量而不是一列传递给 spark 中的 UDF。

地图采用以下格式 Spark 数据框到嵌套地图

val joinUDF = udf((replacementLookup: Map[String, Double], newValue: String) => {
    replacementLookup.get(newValue) match {
      case Some(tt) => tt
      case None => 0.0
    }
  })
Run Code Online (Sandbox Code Playgroud)

应该像这样映射

(columnsMap).foldLeft(df) {
    (currentDF, colName) =>
      {
        println(colName._1)
        println(colName._2)
        currentDF
          .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))
      }
  }
Run Code Online (Sandbox Code Playgroud)

但抛出

type mismatch;
[error]  found   : Map
[error]  required: org.apache.spark.sql.Column
[error]           .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))
Run Code Online (Sandbox Code Playgroud)

Dan*_*ula 6

您可以使用柯里化:

import org.apache.spark.sql.functions._
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn")

def joinUDF(replacementLookup: Map[String, Double]) = udf((newValue: String) => {
  replacementLookup.get(newValue) match {
    case Some(tt) => tt
    case None => 0.0
  }
})

val myMap = Map("a" -> 1.5, "b" -> 3.0)

df.select(joinUDF(myMap)($"StringColumn")).show()
Run Code Online (Sandbox Code Playgroud)

此外,您可以尝试使用广播变量:

import org.apache.spark.sql.functions._
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn")

val myMap = Map("a" -> 1.5, "b" -> 3.0)
val broadcastedMap = sc.broadcast(myMap)

def joinUDF = udf((newValue: String) => {
  broadcastedMap.value.get(newValue) match {
    case Some(tt) => tt
    case None => 0.0
  }
})

df.select(joinUDF($"StringColumn")).show()
Run Code Online (Sandbox Code Playgroud)