如何在Spark SQL中将额外参数传递给UDF?

Dar*_*ero 16 scala user-defined-functions apache-spark apache-spark-sql

我想解析a中的日期列DataFrame,对于每个日期列,日期的分辨率可能会发生变化(例如,如果分辨率设置为"月",则为2011/01/10 => 2011/01).

我写了以下代码:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
  import org.apache.spark.sql.functions._
  val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
  val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}

  val allColNames = dataframe.columns
  val allCols = allColNames.map(name => dataframe.col(name))

  val mappedCols =
  {
    for(i <- allCols.indices) yield
    {
      schema(i) match
      {
        case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
        case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
        case _ => allCols(i)
      }
    }
  }

  dataframe.select(mappedCols:_*)

}}
Run Code Online (Sandbox Code Playgroud)

但它不起作用.似乎我只能将Columns 传递给UDF.我想知道如果我将每个行转换DataFrameRDD并应用函数,它是否会非常慢.

有谁知道正确的解决方案?谢谢!

zer*_*323 39

只需使用一点点currying:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
  SparkDateTimeConverter.convertDate(x, resolution))
Run Code Online (Sandbox Code Playgroud)

并按如下方式使用:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))
Run Code Online (Sandbox Code Playgroud)

另外,你应该看看sql.functions.truncsql.functions.date_format.这些至少应该是部分工作而根本不使用UDF.

注意:

在Spark 2.2或更高版本中,您可以使用以下typedLit功能:

import org.apache.spark.sql.functions.typedLit
Run Code Online (Sandbox Code Playgroud)

它支持更广泛的文字,如SeqMap.

  • 我写了一个关于如何使用currying来创建Spark UDF的教程,它在调用时接受额外的参数.https://gist.github.com/andrearota/5910b5c5ac65845f23856b2415474c38 (7认同)

Mic*_*ust 16

您可以Column使用在中lit(...)定义的函数创建要传递给udf 的文字org.apache.spark.sql.functions

例如:

val takeRight = udf((s: String, i: Int) => s.takeRight(i))
df.select(takeRight($"stringCol", lit(1)))
Run Code Online (Sandbox Code Playgroud)