Noa*_*ish 5 dictionary transient partition apache-spark
我想知道使用sparks mapPartitions
功能与瞬态lazy val有什么不同.
由于每个分区基本上都在不同的节点上运行,因此将在每个节点上创建一个瞬态延迟val实例(假设它在一个对象中).
例如:
class NotSerializable(v: Int) {
def foo(a: Int) = ???
}
object OnePerPartition {
@transient lazy val obj: NotSerializable = new NotSerializable(10)
}
object Test extends App{
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(1 to 100000)
rdd.map(OnePerPartition.obj.foo)
// ---------- VS ----------
rdd.mapPartitions(itr => {
val obj = new NotSerializable(10)
itr.map(obj.foo)
})
}
Run Code Online (Sandbox Code Playgroud)
有人可能会问,为什么你甚至想它......
我想创建一个普通的容器概念运行在任何泛型集合实现我的逻辑(RDD
,List
,scalding pipe
等),
他们都具有的"地图"一个概念,但是mapPartition
是独一无二的spark
.
首先,您transient
lazy
在这里不需要。使用object
包装器足以完成这项工作,您实际上可以将其编写为:
object OnePerExecutor {
val obj: NotSerializable = new NotSerializable(10)
}
Run Code Online (Sandbox Code Playgroud)
对象包装和NotSerializable
内部初始化之间存在根本的区别mapPartitions
。这个:
rdd.mapPartitions(iter => {
val ns = NotSerializable(1)
???
})
Run Code Online (Sandbox Code Playgroud)
NotSerializable
每个分区创建一个实例。
另一方面,对象包装器NotSerializable
为每个执行者JVM 创建一个实例。结果,此实例:
这意味着它应该是线程安全的,并且任何方法调用都应该没有副作用。