小编Luc*_*rra的帖子

Spark任务不具有滞后窗口功能可序列化

我注意到,如果我使用函数调用map(),我在DataFrame上使用Window函数后,Spark会返回"Task not serializable"Exception这是我的代码:

val hc:org.apache.spark.sql.hive.HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hc.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
def f():String = "test"
case class P(name:String,surname:String)
val lag_result:org.apache.spark.sql.Column = lag($"name",1).over(Window.partitionBy($"surname"))
val lista:List[P] = List(P("N1","S1"),P("N2","S2"),P("N2","S2"))
val data_frame:org.apache.spark.sql.DataFrame = hc.createDataFrame(sc.parallelize(lista))
df.withColumn("lag_result", lag_result).map(x => f)
//df.withColumn("lag_result", lag_result).map{case x => def f():String = "test";f}.collect // This works
Run Code Online (Sandbox Code Playgroud)

这是堆栈跟踪:

org.apache.spark.SparkException:org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304)org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util中无法序列化的任务$ ClosureCleaner $$ clean(ClosureCleaner.scala:294)atg.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122)at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)at at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:324)at org.apache.spark.rdd.RDD $$ anonfun $ map $ …

serialization scala window-functions apache-spark apache-spark-sql

10
推荐指数
1
解决办法
2760
查看次数