Spark如何向工人发送关闭?

Era*_*dan 12 apache-spark

当我写RDD转换时,例如

val rdd = sc.parallelise(1 to 1000) 
rdd.map(x => x * 3)
Run Code Online (Sandbox Code Playgroud)

我理解x => x * 3只是一个Function1 的closure()需要是Serializable,我想我在某处编辑EDIT:它正好在文档中暗示:http://spark.apache.org/docs/latest/programming-guide. html#passing-functions-to-spark它被"发送"给工人执行.(例如Akka向线路工作人员发送"可执行的一段代码")

它是如何工作的?

有人在聚会上参加评论并说它实际上并没有发送任何序列化的代码,但是因为每个工作者都得到了jar的"副本",所以它只需要引用哪个函数来运行或类似的东西(但是我我不确定我是否正确引用了那个人

我现在对它的实际工作方式感到十分困惑.

所以我的问题是

  1. 如何将转型关闭发送给工人?通过akka序列化?或者他们"已经在那里",因为火花将整个超级罐发送给每个工人(听起来不太可能......)

  2. 如果是这样,那么罐子的其余部分是如何发送给工人的?这是"cleanupClosure"在做什么?例如,只向工作人员发送相关的字节码而不是整个uberjar?(例如,只有关闭的依赖代码?)

  3. 总而言之,在任何时候,确实会激发类似--jars路径中的罐子与工人同步吗?或者它是否向工人发送"恰当数量"的代码?如果确实发送了闭包,是否需要重新计算它们?或者每次安排任务时是否发送关闭任务?对不起,如果这是愚蠢的问题,但我真的不知道.

如果你能得到你的答案,请添加消息来源,我在文档中找不到它,我太谨慎了,只是通过阅读代码来尝试结束它.

jav*_*dba 2

闭包肯定是在运行时序列化的。我在 pyspark 和 scala 中看到过很多在运行时出现闭包不可序列化异常的实例。有一个复杂的代码称为

ClosureCleaner.scala

def clean(
    closure: AnyRef,
    checkSerializable: Boolean = true,
    cleanTransitively: Boolean = true): Unit = {
  clean(closure, checkSerializable, cleanTransitively, Map.empty)
}
Run Code Online (Sandbox Code Playgroud)

试图缩小正在序列化的代码。然后,代码通过网络发送 - 如果它是可序列化的。否则会抛出异常。

这是 ClosureCleaner 的另一个摘录,用于检查序列化传入函数的能力:

  private def ensureSerializable(func: AnyRef) {
    try {
      if (SparkEnv.get != null) {
        SparkEnv.get.closureSerializer.newInstance().serialize(func)
      }
    } catch {
      case ex: Exception => throw new SparkException("Task not serializable", ex)
    }
  }
Run Code Online (Sandbox Code Playgroud)