我在Spark SQL中有两列DataFrame,每列中的每个条目都是一个字符串数组.
val ngramDataFrame = Seq(
(Seq("curious", "bought", "20"), Seq("iwa", "was", "asj"))
).toDF("filtered_words", "ngrams_array")
Run Code Online (Sandbox Code Playgroud)
我想合并每行中的数组,以在新列中生成单个数组.我的代码如下:
def concat_array(firstarray: Array[String],
secondarray: Array[String]) : Array[String] =
{ (firstarray ++ secondarray).toArray }
val concatUDF = udf(concat_array _)
val concatFrame = ngramDataFrame.withColumn("full_array", concatUDF($"filtered_words", $"ngrams_array"))
Run Code Online (Sandbox Code Playgroud)
我可以concat_array在两个数组上成功使用该函数.但是,当我运行上面的代码时,我得到以下异常:
org.apache.spark.SparkException:作业因阶段失败而中止:阶段16.0中的任务0失败1次,最近失败:阶段16.0中失去的任务0.0(TID 12,localhost):org.apache.spark.SparkException:失败在org.apache.spark.sql.execution的org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext(未知来源)执行用户定义的函数(anonfun $ 1 :(数组,数组)=>数组) .BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:370)at scala.collection.Iterator $$ anon $ 10 .hasNext(Iterator.scala:389)at sca.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408)at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)at at Org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)atg.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at org.ap ache.spark.scheduler.Task.run(Task.scala:86)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274)at …