火花如何处理物体

Fra*_*kie 7 serialization apache-spark rdd

为了测试spark中的序列化异常,我用2种方式编写了一个任务.
第一种方式:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)
    val result = rdd.map(elem => {
      funcs.func_1(elem)
    })        
    println(result.count())
  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}
Run Code Online (Sandbox Code Playgroud)

这种方式火花效果很好.
当我将其更改为以下方式时,它不起作用并抛出NotSerializableException.
第二种方式:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)

    val handler = funcs
    val result = rdd.map(elem => {
      handler.func_1(elem)
    })

    println(result.count())

  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}
Run Code Online (Sandbox Code Playgroud)

我知道我得到错误"任务不可序列化"的原因是因为我试图funcs在第二个例子中将一个不可序列化的对象从驱动程序节点发送到工作节点.对于第二个示例,如果我使对象funcs扩展Serializable,则此错误将消失.

但在我看来,因为它funcs是一个对象而不是一个类,它是一个单例,应该被序列化并从驱动程序发送到工作者而不是在工作节点本身内实例化.在这种情况下,虽然使用对象的funcs方式不同,但我猜funcs在这两个示例中,不可序列化的对象从驱动程序节点传送到工作节点.

我的问题是为什么第一个例子可以成功运行,但第二个例子失败,"task unserializable"异常.

Tim*_*Tim 7

当您在RDD闭包中运行代码(映射,过滤等等)时,执行该代码所需的所有内容将被打包,序列化并发送到执行程序以进行运行.引用的任何对象(或其引用的字段)将在此任务中序列化,这是您有时会获得的对象NotSerializableException.

但是,您的用例有点复杂,并且涉及scala编译器.通常,在scala对象上调用函数等同于调用java静态方法.该对象从未真正存在 - 它基本上就像编写内联代码一样.但是,如果将对象分配给变量,那么实际上是在内存中创建对该对象的引用,并且该对象的行为更像是类,并且可能存在序列化问题.

scala> object A { 
  def foo() { 
    println("bar baz")
  }
}
defined module A

scala> A.foo()  // static method
bar baz

scala> val a = A  // now we're actually assigning a memory location
a: A.type = A$@7e0babb1

scala> a.foo()  // dereferences a before calling foo
bar baz
Run Code Online (Sandbox Code Playgroud)

  • 在这种情况下,您将在闭包内部实例化一个对象,因此它不会被序列化。这里的`funcs`类在驱动程序JVM上不存在任何形式。 (2认同)
  • 好吧,我想我明白你的意思。如果我在RDD闭包中实例化一个类,则该类是否可序列化并不重要,因为RDD中的每个元素都会创建一个新对象。如果要在RDD闭包中使用一个方法,并且此方法引用在驱动程序中实例化的对象,则仅当类可序列化时才能执行任务。那是对的吗? (2认同)

maa*_*asg 5

为了让 Spark 分发给定的操作,操作中使用的函数需要被序列化。在序列化之前,这些函数通过一个复杂的过程,适当地称为“ ClosureCleaner ”。

目的是从上下文中“切断”闭包,以减少需要序列化的对象图的大小,并降低过程中出现序列化问题的风险。换句话说,确保只有执行函数所需的代码被序列化并发送到“另一端”进行反序列化和执行

在该过程中,闭包也被评估为可序列化,以便在运行时主动检测序列化问题 ( SparkContext#clean )。

该代码既密集又复杂,因此很难找到导致这种情况的正确代码路径。

直觉上,发生的事情是当ClosureCleaner发现:

val result = rdd.map{elem => 
  funcs.func_1(elem)
} 
Run Code Online (Sandbox Code Playgroud)

它评估闭包的内部成员是否来自可以重新创建的对象,并且没有进一步的引用,因此清理后的闭包只包含{elem => funcs.func_1(elem)}可以由JavaSerializer.

相反,当闭包清洁器评估:

val handler = funcs
val result = rdd.map(elem => {
  handler.func_1(elem)
})
Run Code Online (Sandbox Code Playgroud)

它发现闭包有一个对$outer( handler)的引用,因此它检查外部作用域并将 和 变量实例添加到清理过的闭包中。我们可以想象得到的清洁闭包是这种形状的(这仅用于说明目的):

{elem => 
  val handler = funcs
  handler.func_1(elem)
} 
Run Code Online (Sandbox Code Playgroud)

当闭包被测试序列化时,它无法序列化。根据 JVM 序列化规则,如果递归地所有成员都可序列化,则对象是可序列化的。在这种情况下,handler引用了一个不可序列化的对象并且检查失败。