我应该将变量保留为瞬态变量吗?

Aki*_*i K 3 java serialization transient apache-spark

我一直在试验Apache Spark试图解决一些查询,如top-k,skyline等.

我做了一个包装SparkConfJavaSparkContext命名的包装器SparkContext.这个类也实现了serializable,但是既然SparkConfJavaSparkContext不可序列化,那么类也不是.

我有一个类解决名为topK的查询TopK,该类实现了serializable,但该类也有一个SparkContext不可序列化的成员变量(由于上述原因).因此,每当我尝试TopK.reduce()RDD中的函数内执行方法时,我都会收到异常.

我发现的解决方案是SparkContext短暂的.

我的问题是:我应该将SparkContext变量保持为瞬态还是我犯了一个大错误?

SparkContext 类:

import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;

public class SparkContext implements Serializable {

    private final SparkConf sparConf; // this is not serializable
    private final JavaSparkContext sparkContext; // this is not either

    protected SparkContext(String appName, String master) {
        this.sparConf = new SparkConf();
        this.sparConf.setAppName(appName);
        this.sparConf.setMaster(master);

        this.sparkContext = new JavaSparkContext(sparConf);
    }

    protected JavaRDD<String> textFile(String path) {
        return sparkContext.textFile(path);
    }

}
Run Code Online (Sandbox Code Playgroud)

TopK 类:

public class TopK implements QueryCalculator, Serializable {

    private final transient SparkContext sparkContext;
    .
    .
    .
}
Run Code Online (Sandbox Code Playgroud)

抛出Task not serializable异常的示例. getBiggestPointByXDimension甚至不会输入,因为为了在reduce函数中执行它,包含it(TopK)的类必须是可序列化的.

private Point findMedianPoint(JavaRDD<Point> points) {
    Point biggestPointByXDimension = points.reduce((a, b) -> getBiggestPointByXDimension(a, b));
    .
    .
    .
}

private Point getBiggestPointByXDimension(Point first, Point second) {
        return first.getX() > second.getX() ? first : second;
    }
Run Code Online (Sandbox Code Playgroud)

maa*_*asg 6

问题:我应该将SparkContext变量保持为瞬态吗?

是.没关系.它只封装了(Java)SparkContext,并且上下文在worker上不可用,因此标记它transient只是告诉Serializer不要序列化该字段.

您也可以让自己的SparkContext包装器不可序列化并将其标记为瞬态 - 与上述效果相同.(顺便说一下,鉴于SparkContext是spark上下文的Scala类名,我选择了另一个名称来避免混乱.)

还有一件事:正如你所指出的,Spark试图序列化完整的封闭类的原因是因为在闭包中使用了类的方法.避免这样!使用匿名类或自包含的闭包(最后将转换为匿名类).