使用匿名函数时Spark TaskNotSerializable

Pat*_*ick 4 closures scala serializable apache-spark

背景

这是我的情况:我正在尝试创建一个基于内容的某些功能过滤RDD的类,但该功能在不同的场景中可能有所不同,所以我想用函数对其进行参数化.不幸的是,我似乎遇到了Scala捕获其闭包的问题.即使我的函数是可序列化的,但类不是.

关闭清洁火花源的例子来看,它似乎表明我的情况无法解决,但我确信有一种方法可以通过创建正确(较小)的闭包来实现我想要做的事情.

我的守则

class MyFilter(getFeature: Element => String, other: NonSerializable) {
  def filter(rdd: RDD[Element]): RDD[Element] = {
    // All my complicated logic I want to share
    rdd.filter { elem => getFeature(elem) == "myTargetString" }     
}
Run Code Online (Sandbox Code Playgroud)

简化示例

class Foo(f: Int => Double, rdd: RDD[Int]) { 
  def go(data: RDD[Int]) = data.map(f) 
}

val works = new Foo(_.toDouble, otherRdd)
works.go(myRdd).collect() // works

val myMap = Map(1 -> 10d)
val complicatedButSerializableFunc: Int => Double = x => myMap.getOrElse(x, 0)
val doesntWork = new Foo(complicatedButSerializableFunc, otherRdd)
doesntWork.go(myRdd).collect() // craps out

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: $iwC$$iwC$Foo
Serialization stack:
    - object not serializable (class: $iwC$$iwC$Foo, value: $iwC$$iwC$Foo@61e33118)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: foo, type: class $iwC$$iwC$Foo)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@47d6a31a)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)

// Even though
val out = new ObjectOutputStream(new FileOutputStream("test.obj"))
out.writeObject(complicatedButSerializableFunc) // works
Run Code Online (Sandbox Code Playgroud)

问题

  1. 为什么第一个简单的例子不会尝试序列化所有Foo但第二个呢?
  2. 如何在不包含对Foo闭包的引用的情况下获取对可序列化函数的引用?

Pat*_*ick 8

本文的帮助下找到了答案.

本质上,当为给定函数创建闭包时,Scala将包含引用的任何复杂字段的整个对象(如果有人对第一个简单示例中为什么不会发生这种情况有一个很好的解释,我会接受这个答案).解决方案是将可序列化值传递给不同的函数,以便仅保留最小引用,非常类似于事件侦听器的ol'javascript for-loop范例.

def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed)

class Foo(f: Int => Double, somethingNonserializable: RDD[String]) { 
 def go(data: RDD[Int]) = enclose(f) { actualFunction => data.map(actualFunction) } 
}
Run Code Online (Sandbox Code Playgroud)

或者使用JS风格的自执行匿名函数

def go(data: RDD[Int]) = ((actualFunction: Int => Double) => data.map(actualFunction))(f)
Run Code Online (Sandbox Code Playgroud)