Spark序列化错误之谜

4le*_*x1v 8 serialization jvm scala apache-spark

假设我有以下代码:

class Context {
  def compute() = Array(1.0)
}
val ctx = new Context
val data = ctx.compute
Run Code Online (Sandbox Code Playgroud)

现在我们在Spark中运行此代码:

val rdd = sc.parallelize(List(1,2,3))
rdd.map(_ + data(0)).count()
Run Code Online (Sandbox Code Playgroud)

上面的代码抛出org.apache.spark.SparkException: Task not serializable.我不是要问如何修复它,通过扩展Serializable或创建一个case类,我想了解错误发生的原因.

我不明白的是为什么它抱怨Context类不是a Serializable,虽然它不是lambda的一部分:rdd.map(_ + data(0)).data这里是一个应该序列化的值数组,但似乎JVM也捕获了ctx引用,在我的理解中,它不应该发生.

据我所知,在shell中,Spark应该从repl上下文中清除lambda.如果我们在delambdafy阶段后打印树,我们会看到这些部分:

object iw extends Object {
  ... 
  private[this] val ctx: $line11.iw$Context = _;
  <stable> <accessor> def ctx(): $line11.iw$Context = iw.this.ctx;
  private[this] val data: Array[Double] = _;
  <stable> <accessor> def data(): Array[Double] = iw.this.data; 
  ...
}

class anonfun$1 ... {
  final def apply(x$1: Int): Double = anonfun$1.this.apply$mcDI$sp(x$1);
  <specialized> def apply$mcDI$sp(x$1: Int): Double = x$1.+(iw.this.data().apply(0));
  ...
}
Run Code Online (Sandbox Code Playgroud)

因此,发送到工作节点的反编译lambda代码是:x$1.+(iw.this.data().apply(0)).部分iw.this属于Spark-Shell会话,因此,据我所知,它应该被ClosureCleaner清除,因为它与逻辑无关,不应该被序列化.无论如何,调用iw.this.data()返回变量的Array[Double]值,该data值在构造函数中初始化:

def <init>(): type = {
  iw.super.<init>();
  iw.this.ctx = new $line11.iw$Context();
  iw.this.data = iw.this.ctx().compute(); // <== here
  iw.this.res4 = ...
  ()
}
Run Code Online (Sandbox Code Playgroud)

在我的理解中,ctx值与lambda无关,它不是一个闭包,因此不应该被序列化.我错过了什么或误解了什么?

Dan*_*don 1

这与 Spark 认为它可以安全地用作闭包的内容有关。这在某些情况下非常直观,因为 Spark 使用反射,并且在许多情况下无法识别 Scala 的一些保证(不是完整的编译器或任何东西)或同一对象中的某些变量不相关的事实。为了安全起见,Spark 将尝试序列化引用的任何对象,在您的情况下包括iw不可序列化的 。

ClosureCleaner里面的代码有一个很好的例子:

例如,在以下场景中需要进行传递清理:

class SomethingNotSerializable {
  def someValue = 1
  def scope(name: String)(body: => Unit) = body
  def someMethod(): Unit = scope("one") {
    def x = someValue
    def y = 2
      scope("two") { println(y + 1) }
  }
}
Run Code Online (Sandbox Code Playgroud)

在此示例中,范围“two”不可序列化,因为它引用范围“one”,而范围“one”又引用 SomethingNotSerialized。但请注意,作用域“two”的主体实际上并不依赖于 SomethingNotSerialized。这意味着我们可以安全地清空克隆作用域“one”的父指针,并将其设置为作用域“two”的父指针,这样作用域“two”不再传递引用 SomethingNotSerialized。

最简单的解决方法可能是在同一范围内创建一个局部变量,从对象中提取值,这样 lambda 内就不再有任何对封装对象的引用:

val rdd = sc.parallelize(List(1,2,3))
val data0 = data(0)
rdd.map(_ + data0).count()
Run Code Online (Sandbox Code Playgroud)