在Spark中,在所有工作者上拥有静态对象的正确方法是什么?

Dan*_*don 17 scala apache-spark

我一直在看Spark的文档,它提到了这个:

Spark的API在很大程度上依赖于在驱动程序中传递函数以在集群上运行.有两种建议的方法可以做到这一点:

匿名函数语法,可用于短代码.全局单例对象中的静态方法.例如,您可以定义对象MyFunctions,然后传递MyFunctions.func1,如下所示:

object MyFunctions {   def func1(s: String): String = { ... } }

myRdd.map(MyFunctions.func1) 
Run Code Online (Sandbox Code Playgroud)

请注意,虽然也可以将引用传递给类实例中的方法(而不是单例对象),但这需要发送包含该类的对象以及方法.例如,考虑:

class MyClass {   
  def func1(s: String): String = { ... }   
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } 
} 
Run Code Online (Sandbox Code Playgroud)

在这里,如果我们创建一个新的MyClass并在其上调用doStuff,那里的map会引用该MyClass实例的func1方法,因此需要将整个对象发送到集群.它类似于写作 rdd.map(x => this.func1(x)).

现在我的疑问是,如果你对单例对象(它应该等同于静态)有属性会发生什么.相同的例子有一个小的改动:

object MyClass {   
  val value = 1   
  def func1(s: String): String = { s + value }   
} 

myRdd.map(MyClass.func1) 
Run Code Online (Sandbox Code Playgroud)

所以该函数仍然是静态引用的,但是Spark尝试序列化所有引用的变量会走多远?它会序列化value还是会在远程工作者中再次初始化?

另外,这一切都在我在单例对象中有一些重型模型的上下文中,我想找到将它们序列化到工作者的正确方法,同时保持从单独的单例引用它们的能力,而不是将它们传递给跨深度函数调用堆栈的函数参数.

有关Spark序列化事物的内容/方式/时间的任何深入信息将不胜感激.

van*_*nza 15

这不是关于Spark的问题,而是关于Scala如何生成代码的更多问题.请记住,Scala object几乎是一个充满静态方法的Java类.考虑一个这样的简单示例:

object foo {

  val value = 42

  def func(i: Int): Int = i + value

  def main(args: Array[String]): Unit = {
    println(Seq(1, 2, 3).map(func).sum)
  }

}
Run Code Online (Sandbox Code Playgroud)

这将被翻译成3个Java类; 其中一个将是闭包,它是map方法的参数.javap在该类上使用产生如下内容:

public final class foo$$anonfun$main$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
  public static final long serialVersionUID;
  public final int apply(int);
  public int apply$mcII$sp(int);
  public final java.lang.Object apply(java.lang.Object);
  public foo$$anonfun$main$1();
}
Run Code Online (Sandbox Code Playgroud)

请注意,没有字段或任何内容.如果你看一下反汇编的字节码,它所做的只是调用func()方法.在Spark中运行时,这是将被序列化的实例; 因为它没有字段,所以没有太多的序列化.

至于你的问题,如何初始化静态对象,你可以拥有一个幂等初始化函数,你可以在闭包开始时调用它.第一个将触发初始化,后续调用将是no-ops.但是,清理是非常棘手的,因为我不熟悉一个像"在所有执行程序上运行此代码"之类的API.

本博客在"setup()和cleanup()"部分中介绍了一种需要清理时非常有用的方法.

编辑:只是为了澄清,这里是实际进行调用的方法的反汇编.

public int apply$mcII$sp(int);
  Code:
   0:   getstatic       #29; //Field foo$.MODULE$:Lfoo$;
   3:   iload_1
   4:   invokevirtual   #32; //Method foo$.func:(I)I
   7:   ireturn
Run Code Online (Sandbox Code Playgroud)

看看它如何引用持有单例的静态字段并调用该func()方法.