当我写RDD转换时,例如
val rdd = sc.parallelise(1 to 1000)
rdd.map(x => x * 3)
Run Code Online (Sandbox Code Playgroud)
我理解x => x * 3只是一个Function1 的closure()需要是Serializable,我想我在某处编辑EDIT:它正好在文档中暗示:http://spark.apache.org/docs/latest/programming-guide. html#passing-functions-to-spark它被"发送"给工人执行.(例如Akka向线路工作人员发送"可执行的一段代码")
它是如何工作的?
有人在聚会上参加评论并说它实际上并没有发送任何序列化的代码,但是因为每个工作者都得到了jar的"副本",所以它只需要引用哪个函数来运行或类似的东西(但是我我不确定我是否正确引用了那个人
我现在对它的实际工作方式感到十分困惑.
所以我的问题是
如何将转型关闭发送给工人?通过akka序列化?或者他们"已经在那里",因为火花将整个超级罐发送给每个工人(听起来不太可能......)
如果是这样,那么罐子的其余部分是如何发送给工人的?这是"cleanupClosure"在做什么?例如,只向工作人员发送相关的字节码而不是整个uberjar?(例如,只有关闭的依赖代码?)
总而言之,在任何时候,确实会激发类似--jars路径中的罐子与工人同步吗?或者它是否向工人发送"恰当数量"的代码?如果确实发送了闭包,是否需要重新计算它们?或者每次安排任务时是否发送关闭任务?对不起,如果这是愚蠢的问题,但我真的不知道.
如果你能得到你的答案,请添加消息来源,我在文档中找不到它,我太谨慎了,只是通过阅读代码来尝试结束它.
闭包肯定是在运行时序列化的。我在 pyspark 和 scala 中看到过很多在运行时出现闭包不可序列化异常的实例。有一个复杂的代码称为
从ClosureCleaner.scala
def clean(
closure: AnyRef,
checkSerializable: Boolean = true,
cleanTransitively: Boolean = true): Unit = {
clean(closure, checkSerializable, cleanTransitively, Map.empty)
}
Run Code Online (Sandbox Code Playgroud)
试图缩小正在序列化的代码。然后,代码通过网络发送 - 如果它是可序列化的。否则会抛出异常。
这是 ClosureCleaner 的另一个摘录,用于检查序列化传入函数的能力:
private def ensureSerializable(func: AnyRef) {
try {
if (SparkEnv.get != null) {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
}
} catch {
case ex: Exception => throw new SparkException("Task not serializable", ex)
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2769 次 |
| 最近记录: |