Kra*_*tam 25 lambda serialization apache-spark spark-dataframe
在Spark中,如何知道哪些对象在驱动程序上实例化以及哪些对象在执行程序上实例化,因此如何确定哪些类需要实现Serializable?
Kra*_*tam 42
序列化对象意味着将其状态转换为字节流,以便可以将字节流还原为对象的副本.如果Java对象的类或其任何超类实现java.io.Serializable接口或其子接口java.io.Externalizable,则该对象是可序列化的.
一个类永远不会序列化,只有类的对象被序列化.如果需要通过网络持久化或传输对象,则需要对象序列化.
Class Component Serialization
instance variable yes
Static instance variable no
methods no
Static methods no
Static inner class no
local variables no
Run Code Online (Sandbox Code Playgroud)
我们来看一个示例Spark代码并浏览各种场景
public class SparkSample {
public int instanceVariable =10 ;
public static int staticInstanceVariable =20 ;
public int run(){
int localVariable =30;
// create Spark conf
final SparkConf sparkConf = new SparkConf().setAppName(config.get(JOB_NAME).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
// create spark context
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
// read DATA
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
// Anonymous class used for lambda implementation
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
// How will the listed varibles be accessed in RDD across driver and Executors
System.out.println("Output :" + instanceVariable + " " + staticInstanceVariable + " " + localVariable);
return Arrays.asList(SPACE.split(s)).iterator();
});
// SAVE OUTPUT
words.saveAsTextFile(OUTPUT_PATH));
}
// Inner Static class for the funactional interface which can replace the lambda implementation above
public static class MapClass extends FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
System.out.println("Output :" + instanceVariable + " " + staticInstanceVariable + " " + localVariable);
return Arrays.asList(SPACE.split(s)).iterator();
});
public static void main(String[] args) throws Exception {
JavaWordCount count = new JavaWordCount();
count.run();
}
}
Run Code Online (Sandbox Code Playgroud)
内部类对象内的外部类的实例变量的可访问性和可序列化
Inner class | Instance Variable (Outer class) | Static Instance Variable (Outer class) | Local Variable (Outer class)
Anonymous class | Accessible And Serialized | Accessible yet not Serialized | Accessible And Serialized
Inner Static class | Not Accessible | Accessible yet not Serialized | Not Accessible
Run Code Online (Sandbox Code Playgroud)
了解Spark工作时的经验法则是:
在RDD中编写的所有lambda函数都在驱动程序上实例化,并且对象被序列化并发送给执行程序
如果在内部类中访问任何外部类实例变量,编译器将应用不同的逻辑来访问它们,因此外部类是否序列化取决于您访问的内容.
就Java而言,整个争论是关于外类和内部类以及如何访问外部类引用和变量导致序列化问题.
各种场景:
编译器默认在构造函数的字节码中插入构造函数
匿名类,引用外部类对象.
外部类对象用于访问实例变量
匿名级(){
final Outer-class reference;
Anonymous-class( Outer-class outer-reference){
reference = outer-reference;
}
Run Code Online (Sandbox Code Playgroud)
}
外部类被序列化并与内部匿名类的序列化对象一起发送
由于静态变量未序列化,外部类对象仍会插入到Anonymous类构造函数中.
静态变量的值取自类状态
出现在执行人身上.
编译器默认在构造函数的字节码中插入构造函数
匿名类,引用外部类对象和局部变量引用.
外部类对象用于访问实例变量
匿名级(){
final Outer-class reference;
final Local-variable localRefrence ;
Anonymous-class( Outer-class outer-reference, Local-variable localRefrence){
reference = outer-reference;
this.localRefrence = localRefrence;
}
Run Code Online (Sandbox Code Playgroud)
}
外部类是序列化的,局部变量对象也是
序列化并与内部匿名类的序列化对象一起发送
由于局部变量成为匿名类中的实例成员,因此需要对其进行序列化.从外部类的角度来看,局部变量永远不能被序列化
无法访问
无法访问
由于静态变量未被序列化,因此没有外部类对象被序列化.
静态变量的值取自类状态
出现在执行人身上.
外部类没有序列化,并与序列化的静态内部类一起发送
思考点:
遵循Java序列化规则来选择需要序列化的类对象.
使用javap -p -c"abc.class"解包字节代码并查看编译器生成的代码
根据您在外部类的内部类中尝试访问的内容,编译器会生成不同的字节代码.
您不需要使类实现只能在驱动程序上访问的序列化.
在RDD中使用的任何匿名/静态类(所有lambda函数都是匿名类)将在驱动程序上实例化.
RDD中使用的任何类/变量都将在驱动程序上实例化并发送给执行程序.
声明为transient的任何实例变量都不会在驱动程序上序列化.
有很多写得很好的博客很好地解释了这一点,就像这个:火花序列化挑战。
但简而言之,我们可以这样得出结论(仅限 Spark,而不是一般的 JVM):
object(又名 Scala 单例)的引用不会被序列化(仅对于 mapPartition 和 foreachPartition,UDF 将始终从驱动程序到执行程序获得 serde)。执行器将直接引用他们本地 JVM 的对象,因为它是一个单例,它将存在于执行器 JVM 上。这意味着,驱动程序在其本地的突变object不会被执行者看到。