如何在 Java 中创建一个接受字符串数组的 Spark UDF?

abh*_*hek 1 java apache-spark

这个问题已在此处针对 Scala提出,我使用 Java API 时它对我没有帮助。我真的把所有东西和厨房水槽都扔了,所以这是我的方法:

List<String> sourceClasses = new ArrayList<String>();
//Add elements
List<String> targetClasses = new ArrayList<String>();
//Add elements

dataset = dataset.withColumn("Transformer", callUDF(
    "Transformer",
    lit((String[])sourceClasses.toArray())
        .cast(DataTypes.createArrayType(DataTypes.StringType)),
    lit((String[])targetClasses.toArray())
        .cast(DataTypes.createArrayType(DataTypes.StringType))
));
Run Code Online (Sandbox Code Playgroud)

对于我的 UDF 声明:

public class Transformer implements UDF2<Seq<String>, Seq<String>, String> {


//  @SuppressWarnings("deprecation")
public String call(Seq<String> sourceClasses, Seq<String> targetClasses)
    throws Exception {
Run Code Online (Sandbox Code Playgroud)

当我运行代码时,执行不会通过 UDF 调用,这是意料之中的,因为我无法匹配类型。请在这方面帮助我。

编辑

我尝试了@Oli 建议的解决方案。但是,我得到以下异常:

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$261: (array<string>, array<string>) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Seq
at com.esrx.dqm.uuid.UUIDTransformerEngine$1.call(UUIDTransformerEngine.java:1)
at org.apache.spark.sql.UDFRegistration$$anonfun$261.apply(UDFRegistration.scala:774)
... 22 more
Run Code Online (Sandbox Code Playgroud)

这一行似乎特别表明了一个问题:

Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Seq
Run Code Online (Sandbox Code Playgroud)

Oli*_*Oli 5

根据我对 UDF 类型的理解,您正在尝试创建一个 UDF,它以两个数组作为输入并返回一个字符串。

在java中,这有点痛苦但可以管理。

假设您想连接两个数组并将它们与单词 AND 链接起来。您可以按如下方式定义 UDF:

UDF2 my_udf2 = new UDF2<WrappedArray<String>, WrappedArray<String>, String>() {
    public String call(WrappedArray<String> a1, WrappedArray a2) throws Exception {
        ArrayList<String> l1 = new ArrayList(JavaConverters
            .asJavaCollectionConverter(a1)
            .asJavaCollection());
        ArrayList<String> l2 = new ArrayList(JavaConverters
            .asJavaCollectionConverter(a2)
            .asJavaCollection());
        return l1.stream().collect(Collectors.joining(",")) +
             " AND " +
             l2.stream().collect(Collectors.joining(","));
    }
};
Run Code Online (Sandbox Code Playgroud)

请注意,您需要WrappedArray在方法的签名中使用 scala并将它们转换到方法的主体中,JavaConverters以便能够在 Java 中操作它们。这是必要的导入以防万一。

import scala.collection.mutable.WrappedArray;
import scala.collection.JavaConverters;
Run Code Online (Sandbox Code Playgroud)

然后你就可以注册 UDF 就可以和 Spark 一起使用了。为了能够使用它,我从“id”列创建了一个示例数据框和两个虚拟数组。请注意,它也可以与lit您在问题中尝试做的功能一起使用。

spark.udf().register("my_udf2", my_udf2, DataTypes.StringType);

String[] data = {"abcd", "efgh", "ijkl"};

spark.range(3)
    .withColumn("id", col("id").cast("string"))
    .withColumn("array", functions.array(col("id"), col("id")))
    .withColumn("string_of_arrays",
          functions.callUDF("my_udf2", col("array"), lit(data)))
    .show(false);
Run Code Online (Sandbox Code Playgroud)

产生:

+---+------+----------------------+
|id |array |string_of_arrays      |
+---+------+----------------------+
|0  |[0, 0]|0,0 AND abcd,efgh,ijkl|
|1  |[1, 1]|1,1 AND abcd,efgh,ijkl|
|2  |[2, 2]|2,2 AND abcd,efgh,ijkl|
+---+------+----------------------+
Run Code Online (Sandbox Code Playgroud)

在 Spark >= 2.3 中,你也可以这样做:

UserDefinedFunction my_udf2 = udf(
    (WrappedArray<String> s1, WrappedArray<String> s2) -> "some_string",
    DataTypes.StringType
);

df.select(my_udf2.apply(col("a1"), col("a2")).show(false);
Run Code Online (Sandbox Code Playgroud)