Spark UDF与varargs

dev*_*ife 18 scala apache-spark udf

如文档中所示,它是列出最多22个所有参数的唯一选项吗?

https://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.UDFRegistration

有人想出如何做类似的事情吗?

sc.udf.register("func", (s: String*) => s......
Run Code Online (Sandbox Code Playgroud)

(编写跳过空值的自定义concat函数,当时只有2个参数)

谢谢

zer*_*323 37

UDF不支持varargs*但您可以使用array函数传递包含的任意数量的列:

import org.apache.spark.sql.functions.{udf, array, lit}

val myConcatFunc = (xs: Seq[Any], sep: String) => 
  xs.filter(_ != null).mkString(sep)

val myConcat = udf(myConcatFunc)
Run Code Online (Sandbox Code Playgroud)

示例用法:

val  df = sc.parallelize(Seq(
  (null, "a", "b", "c"), ("d", null, null, "e")
)).toDF("x1", "x2", "x3", "x4")

val cols = array($"x1", $"x2", $"x3", $"x4")
val sep = lit("-")

df.select(myConcat(cols, sep).alias("concatenated")).show

// +------------+
// |concatenated|
// +------------+
// |       a-b-c|
// |         d-e|
// +------------+
Run Code Online (Sandbox Code Playgroud)

使用原始SQL:

df.registerTempTable("df")
sqlContext.udf.register("myConcat", myConcatFunc)

sqlContext.sql(
    "SELECT myConcat(array(x1, x2, x4), '.') AS concatenated FROM df"
).show

// +------------+
// |concatenated|
// +------------+
// |         a.c|
// |         d.e|
// +------------+
Run Code Online (Sandbox Code Playgroud)

稍微复杂一点的方法是根本不使用UDF,并使用大致如下的东西组成SQL表达式:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column

def myConcatExpr(sep: String, cols: Column*) = regexp_replace(concat(
  cols.foldLeft(lit(""))(
    (acc, c) => when(c.isNotNull, concat(acc, c, lit(sep))).otherwise(acc)
  )
), s"($sep)?$$", "") 

df.select(
  myConcatExpr("-", $"x1", $"x2", $"x3", $"x4").alias("concatenated")
).show
// +------------+
// |concatenated|
// +------------+
// |       a-b-c|
// |         d-e|
// +------------+
Run Code Online (Sandbox Code Playgroud)

但我怀疑除非你使用PySpark,否则值得付出努力.


*如果你使用varargs传递一个函数,它将被从所有的语法糖中剥离,结果UDF会期望一个ArrayType.例如:

def f(s: String*) = s.mkString
udf(f _)
Run Code Online (Sandbox Code Playgroud)

将是类型:

UserDefinedFunction(<function1>,StringType,List(ArrayType(StringType,true)))
Run Code Online (Sandbox Code Playgroud)

  • @Kalpesh`array(df.columns.map(c => struct(lit(c),col(c)):_*)` - >`udf(xs:Seq [Row] => ???)`. (2认同)