了解Spark的闭包及其序列化

Pav*_*nov 17 java serialization closures apache-spark

免责声明:刚开始玩Spark.

我很难理解着名的"任务不可序列化"的例外,但我的问题与我在SO上看到的有点不同(或者我认为).

我有一个很小的自定义RDD(TestRDD).它有一个字段,用于存储其类未实现Serializable(NonSerializable)的对象.我已经设置了"spark.serializer"配置选项来使用Kryo.但是,当我尝试count()使用RDD时,我得到以下信息:

Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
Run Code Online (Sandbox Code Playgroud)

当我查看内部时,DAGScheduler.submitMissingTasks我发现它在我的RDD上使用了闭包序列化程序,这是Java序列化程序,而不是我期望的Kryo序列化程序.我已经读过Kryo有序列化闭包的问题,​​而Spark总是使用Java序列化程序进行闭包,但我不太明白闭包是如何发挥作用的.我在这里所做的就是:

SparkConf conf = new SparkConf()
                         .setAppName("ScanTest")
                         .setMaster("local")
                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

JavaSparkContext sc = new JavaSparkContext(conf);

TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());
Run Code Online (Sandbox Code Playgroud)

也就是说,没有映射器或任何需要序列化闭包的东西.OTOH这是有效的:

sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()
Run Code Online (Sandbox Code Playgroud)

Kryo序列化器按预期使用,不涉及闭合序列化器.如果我没有将序列化程序属性设置为Kryo,我也会在这里得到一个例外.

我很感激任何指针解释闭包的来源以及如何确保我可以使用Kryo来序列化自定义RDD.

更新:这里TestRDD有不可序列化的字段mNS:

class TestRDD extends RDD<String> {

    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);

    NonSerializable mNS = new NonSerializable();

    public TestRDD(final SparkContext _sc) {
        super(_sc,
              JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
              STRING_TAG);
    }

    @Override
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
        return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
                                                                     "test_" + thePartition.index(),
                                                                     "test_" + thePartition.index()).iterator()).asScala();
    }

    @Override
    public Partition[] getPartitions() {
        return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
    }

    static class TestPartition implements Partition {

        final int mIndex;

        public TestPartition(final int theIndex) {
            mIndex = theIndex;
        }

        public int index() {
            return mIndex;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Yuv*_*kov 8

当我查看内部时,DAGScheduler.submitMissingTasks我发现它在我的RDD上使用了闭包序列化程序,这是Java序列化程序,而不是我期望的Kryo序列化程序.

SparkEnv支持两个序列化程序,一个名称serializer用于序列化数据,检查点,工作人员之间的消息传递等,并在spark.serializer配置标志下可用.另一种是所谓closureSerializerspark.closure.serializer这是用来检查你的对象实际上是序列化和可配置为星火<= 1.6.2(但没有什么比其他JavaSerializer实际工作)和2.0.0及以上的硬编码JavaSerializer.

Kryo闭包序列化程序有一个错误,使其无法使用,您可以在SPARK-7708下看到该错误(这可能与Kryo 3.0.0一起修复,但Spark目前已修复,其中特定版本的Chill已在Kryo 2.2上修复. 1).此外,对于Spark 2.0.x,JavaSerializer现在已修复,而不是可配置(您可以在此拉取请求中看到它).这意味着我们有效地坚持JavaSerializer封闭序列化.

我们使用一个序列化程序来提交任务和其他工序之间的序列化数据是不是很奇怪?当然,但这就是我们所拥有的.

总而言之,如果您正在设置spark.serializer配置,或者SparkContext.registerKryoClasses您正在使用Kryo来完成Spark中的大部分序列化.话虽如此,为了检查给定的类是否可序列化并将任务序列化到工作者,Spark将使用JavaSerializer.

  • @PavelKlinov如果您有任何不可序列化的属性,通常的做法是将其标记为`@ transient`并让工作人员懒惰地加载它. (2认同)