如何将整行传递给UDF - Spark DataFrame过滤器

Mic*_*ser 21 apache-spark

我正在为具有大量内部结构的复杂JSON数据集编写过滤函数.传递单个列太麻烦了.

所以我声明了以下UDF:

val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))
Run Code Online (Sandbox Code Playgroud)

直觉我觉得它会像这样工作:

records.filter("myFilter(*)=true")
Run Code Online (Sandbox Code Playgroud)

实际的语法是什么?

ags*_*hin 23

在调用struct()函数时必须使用函数构造行,请按照以下步骤操作.

导入行,

import org.apache.spark.sql._
Run Code Online (Sandbox Code Playgroud)

定义UDF

def myFilterFunction(r:Row) = {r.get(0)==r.get(1)} 
Run Code Online (Sandbox Code Playgroud)

注册UDF

sqlContext.udf.register("myFilterFunction", myFilterFunction _)
Run Code Online (Sandbox Code Playgroud)

创建dataFrame

val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")
Run Code Online (Sandbox Code Playgroud)

使用UDF

records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show
Run Code Online (Sandbox Code Playgroud)

当你想要将所有列传递给UDF时.

records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show 
Run Code Online (Sandbox Code Playgroud)

结果:

+------+------+
|  text| text2|
+------+------+
|sachin|sachin|
+------+------+
Run Code Online (Sandbox Code Playgroud)

  • (很晚,我确定你现在有答案但是:`struct(df.columns.map(df(_)):_*)` (7认同)