如何使用JAVA在Spark DataFrame上调用UDF?

Kai*_*Kai 14 java user-defined-functions apache-spark apache-spark-sql

类似的问题在这里,但没有足够的点来评论那里.

根据最新的Spark 文档,udf可以使用两种不同的方式,一种使用SQL,另一种使用DataFrame.我找到了多个如何使用udfwith sql的例子,但是却找不到任何关于如何udf直接在DataFrame上使用的例子.

由运上上面链接的问题提供解决方案使用__callUDF()__其是_deprecated_根据所述火花Java API文档将在火花2.0被移除.在那里,它说:

"因为它与udf()是多余的"

所以这意味着我应该可以__udf()__用来训练我的udf,但我无法弄清楚如何做到这一点.我没有偶然发现任何说明Java-Spark程序语法的内容.我错过了什么?

import org.apache.spark.sql.api.java.UDF1;
.
.    
UDF1 mode = new UDF1<String[], String>() {
    public String call(final String[] types) throws Exception {
        return types[0];
    }
};

sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?
Run Code Online (Sandbox Code Playgroud)

zer*_*323 22

Spark> = 2.3

udf可以直接调用Scala样式:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction mode = udf(
  (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);

df.select(mode.apply(col("vs"))).show();
Run Code Online (Sandbox Code Playgroud)

Spark <2.3

即使我们假设您的UDF很有用并且不能被简单的getItem调用替换它也有不正确的签名.使用Scala WrappedArray而不是普通的Java Arrays 公开数组列,因此您必须调整签名:

UDF1 mode = new UDF1<Seq<String>, String>() {
  public String call(final Seq<String> types) throws Exception {
    return types.headOption();
  }
};
Run Code Online (Sandbox Code Playgroud)

如果UDF已经注册:

sqlContext.udf().register("mode", mode, DataTypes.StringType);
Run Code Online (Sandbox Code Playgroud)

您可以简单地使用callUDF(这是1.5中引入的新功能)来按名称调用它:

df.select(callUDF("mode", col("vs"))).show();
Run Code Online (Sandbox Code Playgroud)

您还可以在selectExprs以下位置使用它:

df.selectExpr("mode(vs)").show();
Run Code Online (Sandbox Code Playgroud)

  • 在Spark&gt; = 2.3中,如何将多个列传递给答案中定义的UDF? (2认同)