ajp*_*619 5 scala apache-spark
正如问题所述,我想使用由orElse组成的部分函数作为spark中的udf.这是一个可以在spark shell中运行的示例:
val df = sc.parallelize(1 to 15).toDF("num")
df.show
//Testing out a normal udf - this works
val gt5: (Int => String) = num => (num > 5).toString
val gt5Udf = udf(gt5)
df.withColumn("gt5", gt5Udf(col("num"))).show
//Now create a udf of a partial function composed with orElse
val baseline: PartialFunction[Int, String] = { case _ => "baseline" }
val ge3: PartialFunction[Int, String] = { case x if x >= 3 => ">=3" }
val ge7: PartialFunction[Int, String] = { case x if x >= 7 => ">=7" }
val ge12: PartialFunction[Int, String] = { case x if x >= 12 => ">=12" }
val composed: PartialFunction[Int, String] = ge12 orElse ge7 orElse ge3 orElse baseline
val composedUdf = udf(composed)
//This fails (but this is what I'd like to do)
df.withColumn("pf", composedUdf(col("num"))).show
//Use a partial function not composed with orElse - this works
val baselineUdf = udf(baseline)
df.withColumn("pf", baselineUdf(col("num"))).show
Run Code Online (Sandbox Code Playgroud)
我目前在具有以下配置的三节点独立群集上运行此操作:
我在这个答案中找到了我认为的线索:为什么Scala可以序列化Function而不是PartialFunction?
所以我试过了:
scala> composed.isInstanceOf[Serializable]
res: Boolean = false
scala> composedUdf.isInstanceOf[Serializable]
res: Boolean = true
scala> baseline.isInstanceOf[Serializable]
res: Boolean = true
scala> baselineUdf.isInstanceOf[Serializable]
res: Boolean = true
Run Code Online (Sandbox Code Playgroud)
我在这里变得模糊,但似乎用orElse组成一个部分函数会删除序列化?
我认为最具信息性的错误是:
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: scala.PartialFunction$OrElse
...
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个问题?还是我离开基地?
在此先感谢您的帮助!
如果您将其抬起并将其包装在另一个函数中,它应该可以工作。
val composed: Int => Option[String] =
x => (ge12 orElse ge7 orElse ge3 orElse baseline).lift.apply(x)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
573 次 |
| 最近记录: |