4le*_*x1v 8 serialization jvm scala apache-spark
假设我有以下代码:
class Context {
def compute() = Array(1.0)
}
val ctx = new Context
val data = ctx.compute
Run Code Online (Sandbox Code Playgroud)
现在我们在Spark中运行此代码:
val rdd = sc.parallelize(List(1,2,3))
rdd.map(_ + data(0)).count()
Run Code Online (Sandbox Code Playgroud)
上面的代码抛出org.apache.spark.SparkException: Task not serializable.我不是要问如何修复它,通过扩展Serializable或创建一个case类,我想了解错误发生的原因.
我不明白的是为什么它抱怨Context类不是a Serializable,虽然它不是lambda的一部分:rdd.map(_ + data(0)).data这里是一个应该序列化的值数组,但似乎JVM也捕获了ctx引用,在我的理解中,它不应该发生.
据我所知,在shell中,Spark应该从repl上下文中清除lambda.如果我们在delambdafy阶段后打印树,我们会看到这些部分:
object iw extends Object {
...
private[this] val ctx: $line11.iw$Context = _;
<stable> <accessor> def ctx(): $line11.iw$Context = iw.this.ctx;
private[this] val data: Array[Double] = _;
<stable> <accessor> def data(): Array[Double] = iw.this.data;
...
}
class anonfun$1 ... {
final def apply(x$1: Int): Double = anonfun$1.this.apply$mcDI$sp(x$1);
<specialized> def apply$mcDI$sp(x$1: Int): Double = x$1.+(iw.this.data().apply(0));
...
}
Run Code Online (Sandbox Code Playgroud)
因此,发送到工作节点的反编译lambda代码是:x$1.+(iw.this.data().apply(0)).部分iw.this属于Spark-Shell会话,因此,据我所知,它应该被ClosureCleaner清除,因为它与逻辑无关,不应该被序列化.无论如何,调用iw.this.data()返回变量的Array[Double]值,该data值在构造函数中初始化:
def <init>(): type = {
iw.super.<init>();
iw.this.ctx = new $line11.iw$Context();
iw.this.data = iw.this.ctx().compute(); // <== here
iw.this.res4 = ...
()
}
Run Code Online (Sandbox Code Playgroud)
在我的理解中,ctx值与lambda无关,它不是一个闭包,因此不应该被序列化.我错过了什么或误解了什么?
这与 Spark 认为它可以安全地用作闭包的内容有关。这在某些情况下非常直观,因为 Spark 使用反射,并且在许多情况下无法识别 Scala 的一些保证(不是完整的编译器或任何东西)或同一对象中的某些变量不相关的事实。为了安全起见,Spark 将尝试序列化引用的任何对象,在您的情况下包括iw不可序列化的 。
ClosureCleaner里面的代码有一个很好的例子:
例如,在以下场景中需要进行传递清理:
Run Code Online (Sandbox Code Playgroud)class SomethingNotSerializable { def someValue = 1 def scope(name: String)(body: => Unit) = body def someMethod(): Unit = scope("one") { def x = someValue def y = 2 scope("two") { println(y + 1) } } }在此示例中,范围“two”不可序列化,因为它引用范围“one”,而范围“one”又引用 SomethingNotSerialized。但请注意,作用域“two”的主体实际上并不依赖于 SomethingNotSerialized。这意味着我们可以安全地清空克隆作用域“one”的父指针,并将其设置为作用域“two”的父指针,这样作用域“two”不再传递引用 SomethingNotSerialized。
最简单的解决方法可能是在同一范围内创建一个局部变量,从对象中提取值,这样 lambda 内就不再有任何对封装对象的引用:
val rdd = sc.parallelize(List(1,2,3))
val data0 = data(0)
rdd.map(_ + data0).count()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
985 次 |
| 最近记录: |