我正在为具有大量内部结构的复杂JSON数据集编写过滤函数.传递单个列太麻烦了.
所以我声明了以下UDF:
val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))
Run Code Online (Sandbox Code Playgroud)
直觉我觉得它会像这样工作:
records.filter("myFilter(*)=true")
Run Code Online (Sandbox Code Playgroud)
实际的语法是什么?
ags*_*hin 23
在调用struct()函数时必须使用函数构造行,请按照以下步骤操作.
导入行,
import org.apache.spark.sql._
Run Code Online (Sandbox Code Playgroud)
定义UDF
def myFilterFunction(r:Row) = {r.get(0)==r.get(1)}
Run Code Online (Sandbox Code Playgroud)
注册UDF
sqlContext.udf.register("myFilterFunction", myFilterFunction _)
Run Code Online (Sandbox Code Playgroud)
创建dataFrame
val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")
Run Code Online (Sandbox Code Playgroud)
使用UDF
records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show
Run Code Online (Sandbox Code Playgroud)
当你想要将所有列传递给UDF时.
records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show
Run Code Online (Sandbox Code Playgroud)
结果:
+------+------+
| text| text2|
+------+------+
|sachin|sachin|
+------+------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
16091 次 |
| 最近记录: |