Spark mapPartitions vs transient lazy val

Noa*_*ish 5 dictionary transient partition apache-spark

我想知道使用sparks mapPartitions功能与瞬态lazy val有什么不同.
由于每个分区基本上都在不同的节点上运行,因此将在每个节点上创建一个瞬态延迟val实例(假设它在一个对象中).

例如:

class NotSerializable(v: Int) {
  def foo(a: Int) = ???
}

object OnePerPartition {
  @transient lazy val obj: NotSerializable = new NotSerializable(10)
}

object Test extends App{
    val conf = new SparkConf().setMaster("local[2]").setAppName("test")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1 to 100000)

    rdd.map(OnePerPartition.obj.foo)

    // ---------- VS ----------

    rdd.mapPartitions(itr => {
      val obj = new NotSerializable(10)
      itr.map(obj.foo)
    })
}
Run Code Online (Sandbox Code Playgroud)

有人可能会问,为什么你甚至想它......
我想创建一个普通的容器概念运行在任何泛型集合实现我的逻辑(RDD,List,scalding pipe等),
他们都具有的"地图"一个概念,但是mapPartition是独一无二的spark.

use*_*411 5

首先,您transient lazy在这里不需要。使用object包装器足以完成这项工作,您实际上可以将其编写为:

object OnePerExecutor {
  val obj: NotSerializable = new NotSerializable(10)
}
Run Code Online (Sandbox Code Playgroud)

对象包装和NotSerializable内部初始化之间存在根本的区别mapPartitions。这个:

rdd.mapPartitions(iter => {
  val ns = NotSerializable(1)
  ???
})
Run Code Online (Sandbox Code Playgroud)

NotSerializable每个分区创建一个实例。

另一方面,对象包装器NotSerializable为每个执行者JVM 创建一个实例。结果,此实例:

  • 可用于处理多个分区。
  • 可以由多个执行程序线程同时访问。
  • 使用期限超过函数调用的寿命。

这意味着它应该是线程安全的,并且任何方法调用都应该没有副作用。