小编Pat*_*ick的帖子

使用匿名函数时Spark TaskNotSerializable

背景

这是我的情况:我正在尝试创建一个基于内容的某些功能过滤RDD的类,但该功能在不同的场景中可能有所不同,所以我想用函数对其进行参数化.不幸的是,我似乎遇到了Scala捕获其闭包的问题.即使我的函数是可序列化的,但类不是.

关闭清洁火花源的例子来看,它似乎表明我的情况无法解决,但我确信有一种方法可以通过创建正确(较小)的闭包来实现我想要做的事情.

我的守则

class MyFilter(getFeature: Element => String, other: NonSerializable) {
  def filter(rdd: RDD[Element]): RDD[Element] = {
    // All my complicated logic I want to share
    rdd.filter { elem => getFeature(elem) == "myTargetString" }     
}
Run Code Online (Sandbox Code Playgroud)

简化示例

class Foo(f: Int => Double, rdd: RDD[Int]) { 
  def go(data: RDD[Int]) = data.map(f) 
}

val works = new Foo(_.toDouble, otherRdd)
works.go(myRdd).collect() // works

val myMap = Map(1 -> 10d)
val complicatedButSerializableFunc: Int => Double = x => myMap.getOrElse(x, 0)
val doesntWork …
Run Code Online (Sandbox Code Playgroud)

closures scala serializable apache-spark

4
推荐指数
1
解决办法
1864
查看次数

标签 统计

apache-spark ×1

closures ×1

scala ×1

serializable ×1