Spark - 使用不可序列化的成员序列化对象

sum*_*ulb 9 java serialization scala kryo apache-spark

我将在Spark的上下文中提出这个问题,因为这就是我所面临的问题,但这可能是一个普通的Java问题.

在我们的火花工作中,我们Resolver需要在所有工人中使用它(它在udf中使用).问题是它不可序列化,我们无法改变它.解决方案是将其作为序列化的另一个类的成员.

所以我们最终得到:

public class Analyzer implements Serializable {
    transient Resolver resolver;

    public Analyzer() {
        System.out.println("Initializing a Resolver...");
        resolver = new Resolver();
    }

    public int resolve(String key) {
         return resolver.find(key);
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我们broadcast使用Spark API这个类:

 val analyzer = sparkContext.broadcast(new Analyzer())
Run Code Online (Sandbox Code Playgroud)

(有关Spark广播的更多信息,请点击此处)

然后,我们继续analyzer在UDF中使用,作为我们的火花代码的一部分,具体如下:

val resolve = udf((key: String) => analyzer.value.resolve(key))
val result = myDataFrame.select("key", resolve("key")).count()
Run Code Online (Sandbox Code Playgroud)

这一切都按预期工作,但让我们感到疑惑.

Resolver没有实现Serializable,因此标记为transient- 意味着它不会与它的所有者对象一起序列化Analyzer.

但是正如您从上面的代码中可以清楚地看到的那样,该resolve()方法使用resolver,因此它不能为null.事实上并非如此.代码有效.

因此,如果字段未通过序列化传递,那么resolver成员如何实例化?

我最初的想法是,可能在Analyzer接收端(即火花工人)调用构造函数,但是我希望看到该行"Initializing a Resolver..."打印几次.但它只打印一次,这可能表明它只被调用一次,就在它传递给广播API之前.那么为什么不是resolvernull?

我是否遗漏了有关JVM序列化或Spark序列化的内容?

这段代码怎么工作?

Spark在cluster模式下在YARN上运行. spark.serializer设置为org.apache.spark.serializer.KryoSerializer.

Yuv*_*kov 3

那么如果字段不通过序列化传递,那么resolver成员是如何实例化的呢?

当调用时,它是通过构造函数调用 ( new Resolver)实例化的kryo.readObject

kryo.readClassAndObject(input).asInstanceOf[T]
Run Code Online (Sandbox Code Playgroud)

我最初的想法是,可能在接收端(即 Spark Worker)调用分析器构造函数,但随后我希望看到“初始化解析器...”这一行被打印多次。但它只打印一次,这可能表明它只被调用一次

这不是广播变量的工作原理。发生的情况是,当每个 Executor 需要作用域中的广播变量时​​,它首先检查其内存中是否有该对象BlockManager,如果没有,它会询问驱动程序或邻居执行程序(如果该执行程序上有多个执行程序)相同的 Worker 节点)为其缓存的实例,它们将其序列化并将其发送给他,然后他接收该实例并将其缓存在自己的BlockManager.

TorrentBroadcast这记录在(默认的广播实现)的行为中:

* The driver divides the serialized object into small chunks and
* stores those chunks in the BlockManager of the driver.
*
* On each executor, the executor first attempts to fetch the object from its BlockManager. If
* it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
* other executors if available. Once it gets the chunks, it puts the chunks in its own
* BlockManager, ready for other executors to fetch from.
*
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor).
Run Code Online (Sandbox Code Playgroud)

如果我们删除瞬态,它就会失败,并且堆栈跟踪会导致 Kryo

这是因为您的类中可能有一个字段,无论属性Resolver如何,即使 Kryo 也无法序列化Serializable