这个问题已在此处针对 Scala提出,在我使用 Java API 时它对我没有帮助。我真的把所有东西和厨房水槽都扔了,所以这是我的方法:
List<String> sourceClasses = new ArrayList<String>();
//Add elements
List<String> targetClasses = new ArrayList<String>();
//Add elements
dataset = dataset.withColumn("Transformer", callUDF(
"Transformer",
lit((String[])sourceClasses.toArray())
.cast(DataTypes.createArrayType(DataTypes.StringType)),
lit((String[])targetClasses.toArray())
.cast(DataTypes.createArrayType(DataTypes.StringType))
));
Run Code Online (Sandbox Code Playgroud)
对于我的 UDF 声明:
public class Transformer implements UDF2<Seq<String>, Seq<String>, String> {
// @SuppressWarnings("deprecation")
public String call(Seq<String> sourceClasses, Seq<String> targetClasses)
throws Exception {
Run Code Online (Sandbox Code Playgroud)
当我运行代码时,执行不会通过 UDF 调用,这是意料之中的,因为我无法匹配类型。请在这方面帮助我。
我尝试了@Oli 建议的解决方案。但是,我得到以下异常:
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$261: (array<string>, array<string>) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Seq
at com.esrx.dqm.uuid.UUIDTransformerEngine$1.call(UUIDTransformerEngine.java:1)
at org.apache.spark.sql.UDFRegistration$$anonfun$261.apply(UDFRegistration.scala:774)
... 22 more
Run Code Online (Sandbox Code Playgroud)
这一行似乎特别表明了一个问题:
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Seq
Run Code Online (Sandbox Code Playgroud)
根据我对 UDF 类型的理解,您正在尝试创建一个 UDF,它以两个数组作为输入并返回一个字符串。
在java中,这有点痛苦但可以管理。
假设您想连接两个数组并将它们与单词 AND 链接起来。您可以按如下方式定义 UDF:
UDF2 my_udf2 = new UDF2<WrappedArray<String>, WrappedArray<String>, String>() {
public String call(WrappedArray<String> a1, WrappedArray a2) throws Exception {
ArrayList<String> l1 = new ArrayList(JavaConverters
.asJavaCollectionConverter(a1)
.asJavaCollection());
ArrayList<String> l2 = new ArrayList(JavaConverters
.asJavaCollectionConverter(a2)
.asJavaCollection());
return l1.stream().collect(Collectors.joining(",")) +
" AND " +
l2.stream().collect(Collectors.joining(","));
}
};
Run Code Online (Sandbox Code Playgroud)
请注意,您需要WrappedArray在方法的签名中使用 scala并将它们转换到方法的主体中,JavaConverters以便能够在 Java 中操作它们。这是必要的导入以防万一。
import scala.collection.mutable.WrappedArray;
import scala.collection.JavaConverters;
Run Code Online (Sandbox Code Playgroud)
然后你就可以注册 UDF 就可以和 Spark 一起使用了。为了能够使用它,我从“id”列创建了一个示例数据框和两个虚拟数组。请注意,它也可以与lit您在问题中尝试做的功能一起使用。
spark.udf().register("my_udf2", my_udf2, DataTypes.StringType);
String[] data = {"abcd", "efgh", "ijkl"};
spark.range(3)
.withColumn("id", col("id").cast("string"))
.withColumn("array", functions.array(col("id"), col("id")))
.withColumn("string_of_arrays",
functions.callUDF("my_udf2", col("array"), lit(data)))
.show(false);
Run Code Online (Sandbox Code Playgroud)
产生:
+---+------+----------------------+
|id |array |string_of_arrays |
+---+------+----------------------+
|0 |[0, 0]|0,0 AND abcd,efgh,ijkl|
|1 |[1, 1]|1,1 AND abcd,efgh,ijkl|
|2 |[2, 2]|2,2 AND abcd,efgh,ijkl|
+---+------+----------------------+
Run Code Online (Sandbox Code Playgroud)
在 Spark >= 2.3 中,你也可以这样做:
UserDefinedFunction my_udf2 = udf(
(WrappedArray<String> s1, WrappedArray<String> s2) -> "some_string",
DataTypes.StringType
);
df.select(my_udf2.apply(col("a1"), col("a2")).show(false);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1244 次 |
| 最近记录: |