使用Spark的DataFrame时,需要使用用户定义函数(UDF)来映射列中的数据.UDF要求显式指定参数类型.在我的情况下,我需要操作由对象数组组成的列,我不知道要使用什么类型.这是一个例子:
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
Run Code Online (Sandbox Code Playgroud)
使用内置org.apache.spark.sql.functions函数对列中的数据执行基本操作相对简单
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
Run Code Online (Sandbox Code Playgroud)
并且通常很容易编写自定义UDF来执行任意操作
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : …Run Code Online (Sandbox Code Playgroud) scala user-defined-functions dataframe apache-spark apache-spark-sql
问题
我想在Java中创建一个用户定义的函数,可以在Apache Spark运算符链中作为Java方法调用.我在查找不需要在SQL查询中存在UDF的Java示例时遇到了麻烦.
版本
我尝试过的是什么
我可以用Java成功创建UDF.但是,我不能使用它,除非它在SQL查询中:
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
sqlContext.udf().register("udfUppercase",
(String string) -> string.toUpperCase(), DataTypes.StringType);
DataFrame oldDF = // a simple DataFrame with a "name" column
oldDF.registerTempTable("df");
DataFrame newDF = sqlContext.sql("SELECT udfUppercase(name) AS name_upper FROM df");
Run Code Online (Sandbox Code Playgroud)
我被困在哪里
我希望Java中的非SQL方法调用样式的UDF看起来像这样:
import static org.apache.spark.sql.functions.udf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
UserDefinedFunction udfUppercase = udf(
(String string) -> string.toUpperCase(), DataTypes.StringType);
DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", …Run Code Online (Sandbox Code Playgroud) java user-defined-functions dataframe apache-spark apache-spark-sql