了解Spark序列化

Kra*_*tam 25 lambda serialization apache-spark spark-dataframe

在Spark中,如何知道哪些对象在驱动程序上实例化以及哪些对象在执行程序上实例化,因此如何确定哪些类需要实现Serializable?

Kra*_*tam 42

序列化对象意味着将其状态转换为字节流,以便可以将字节流还原为对象的副本.如果Java对象的类或其任何超类实现java.io.Serializable接口或其子接口java.io.Externalizable,则该对象是可序列化的.

一个类永远不会序列化,只有类的对象被序列化.如果需要通过网络持久化或传输对象,则需要对象序列化.

Class Component            Serialization
instance variable           yes
Static instance variable    no
methods                     no
Static methods              no
Static inner class          no
local variables             no
Run Code Online (Sandbox Code Playgroud)

我们来看一个示例Spark代码并浏览各种场景

public class SparkSample {

      public int instanceVariable                =10 ;
      public static int staticInstanceVariable   =20 ;

      public int run(){

         int localVariable                       =30;

         // create Spark conf
         final SparkConf sparkConf = new SparkConf().setAppName(config.get(JOB_NAME).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

         // create spark context 
         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        // read DATA 
        JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); 


        // Anonymous class used for lambda implementation
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) {
                // How will the listed varibles be accessed in RDD across driver and Executors 
                System.out.println("Output :" + instanceVariable + " " + staticInstanceVariable + " " + localVariable);
                return Arrays.asList(SPACE.split(s)).iterator();
        });

        // SAVE OUTPUT
        words.saveAsTextFile(OUTPUT_PATH));

      }

       // Inner Static class for the funactional interface which can replace the lambda implementation above 
       public static class MapClass extends FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) {
                System.out.println("Output :" + instanceVariable + " " + staticInstanceVariable + " " + localVariable);
                return Arrays.asList(SPACE.split(s)).iterator();
        }); 

        public static void main(String[] args) throws Exception {
            JavaWordCount count = new JavaWordCount();
            count.run();
        }
}
Run Code Online (Sandbox Code Playgroud)

内部类对象内的外部类的实例变量的可访问性和可序列化

    Inner class        | Instance Variable (Outer class) | Static Instance Variable (Outer class) | Local Variable (Outer class)

    Anonymous class    | Accessible And Serialized       | Accessible yet not Serialized          | Accessible And Serialized 

    Inner Static class | Not Accessible                  | Accessible yet not Serialized          | Not Accessible 
Run Code Online (Sandbox Code Playgroud)

了解Spark工作时的经验法则是:

  1. 在RDD中编写的所有lambda函数都在驱动程序上实例化,并且对象被序列化并发送给执行程序

  2. 如果在内部类中访问任何外部类实例变量,编译器将应用不同的逻辑来访问它们,因此外部类是否序列化取决于您访问的内容.

  3. 就Java而言,整个争论是关于外类和内部类以及如何访问外部类引用和变量导致序列化问题.

各种场景:

外类在Anonymous类中访问的变量变量:


实例变量(外类)

编译器默认在构造函数的字节码中插入构造函数

匿名类,引用外部类对象.

外部类对象用于访问实例变量

匿名级(){

 final Outer-class reference;

 Anonymous-class( Outer-class outer-reference){

reference = outer-reference;

}
Run Code Online (Sandbox Code Playgroud)

}

外部类被序列化并与内部匿名类的序列化对象一起发送


静态实例变量(外类)

由于静态变量未序列化,外部类对象仍会插入到Anonymous类构造函数中.

静态变量的值取自类状态

出现在执行人身上.


局部变量(外类)

编译器默认在构造函数的字节码中插入构造函数

匿名类,引用外部类对象和局部变量引用.

外部类对象用于访问实例变量

匿名级(){

 final Outer-class reference;

final Local-variable localRefrence ;

 Anonymous-class( Outer-class outer-reference, Local-variable localRefrence){

reference = outer-reference;

this.localRefrence = localRefrence;

}
Run Code Online (Sandbox Code Playgroud)

}

外部类是序列化的,局部变量对象也是

序列化并与内部匿名类的序列化对象一起发送

由于局部变量成为匿名类中的实例成员,因此需要对其进行序列化.从外部类的角度来看,局部变量永远不能被序列化

----------

使用Static内部类访问的外部类变量.

实例变量(外类)

无法访问


局部变量(外类)

无法访问


静态实例变量(外类)

由于静态变量未被序列化,因此没有外部类对象被序列化.

静态变量的值取自类状态

出现在执行人身上.

外部类没有序列化,并与序列化的静态内部类一起发送


思考点:

  1. 遵循Java序列化规则来选择需要序列化的类对象.

  2. 使用javap -p -c"abc.class"解包字节代码并查看编译器生成的代码

  3. 根据您在外部类的内部类中尝试访问的内容,编译器会生成不同的字节代码.

  4. 您不需要使类实现只能在驱动程序上访问的序列化.

  5. 在RDD中使用的任何匿名/静态类(所有lambda函数都是匿名类)将在驱动程序上实例化.

  6. RDD中使用的任何类/变量都将在驱动程序上实例化并发送给执行程序.

  7. 声明为transient的任何实例变量都不会在驱动程序上序列化.

    1. 默认情况下,匿名类将强制您使外部类可序列化.
    2. 任何局部变量/对象都不必是可序列化的.
    3. 只有在Anonymous类中使用局部变量时才需要序列化
    4. 可以在pair,mapToPair函数的call()方法中创建单例,从而确保它永远不会在驱动程序上初始化
    5. 静态变量永远不会被序列化,因此永远不会从驱动程序发送到执行程序
  8. 如果你需要只在执行程序上执行任何服务,在lambda函数中使它们成为静态字段,或者使它们成为瞬态和singelton并检查null条件以实例化它们


lin*_*hrr 5

有很多写得很好的博客很好地解释了这一点,就像这个:火花序列化挑战

但简而言之,我们可以这样得出结论(仅限 Spark,而不是一般的 JVM):

  1. 由于JVM,只能序列化对象(函数是对象)
  2. 如果一个对象需要被序列化,它的父对象也需要被序列化
  3. 任何 Spark 操作,如(map、flatMap、filter、foreachPartition、mapPartition 等),如果内部部分引用了外部部分对象,则该对象需要序列化。因为外部对象在驱动程序中,而不是在执行程序中。序列化政策是指我的观点#2。
  4. 对 Scala object(又名 Scala 单例)的引用不会被序列化(仅对于 mapPartition 和 foreachPartition,UDF 将始终从驱动程序到执行程序获得 serde)。执行器将直接引用他们本地 JVM 的对象,因为它是一个单例,它将存在于执行器 JVM 上。这意味着,驱动程序在其本地的突变object不会被执行者看到。