Aki*_*i K 3 java serialization transient apache-spark
我一直在试验Apache Spark试图解决一些查询,如top-k,skyline等.
我做了一个包装SparkConf和JavaSparkContext命名的包装器SparkContext.这个类也实现了serializable,但是既然SparkConf又JavaSparkContext不可序列化,那么类也不是.
我有一个类解决名为topK的查询TopK,该类实现了serializable,但该类也有一个SparkContext不可序列化的成员变量(由于上述原因).因此,每当我尝试TopK从.reduce()RDD中的函数内执行方法时,我都会收到异常.
我发现的解决方案是SparkContext短暂的.
我的问题是:我应该将SparkContext变量保持为瞬态还是我犯了一个大错误?
SparkContext 类:
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
public class SparkContext implements Serializable {
private final SparkConf sparConf; // this is not serializable
private final JavaSparkContext sparkContext; // this is not either
protected SparkContext(String appName, String master) {
this.sparConf = new SparkConf();
this.sparConf.setAppName(appName);
this.sparConf.setMaster(master);
this.sparkContext = new JavaSparkContext(sparConf);
}
protected JavaRDD<String> textFile(String path) {
return sparkContext.textFile(path);
}
}
Run Code Online (Sandbox Code Playgroud)
TopK 类:
public class TopK implements QueryCalculator, Serializable {
private final transient SparkContext sparkContext;
.
.
.
}
Run Code Online (Sandbox Code Playgroud)
抛出Task not serializable异常的示例. getBiggestPointByXDimension甚至不会输入,因为为了在reduce函数中执行它,包含it(TopK)的类必须是可序列化的.
private Point findMedianPoint(JavaRDD<Point> points) {
Point biggestPointByXDimension = points.reduce((a, b) -> getBiggestPointByXDimension(a, b));
.
.
.
}
private Point getBiggestPointByXDimension(Point first, Point second) {
return first.getX() > second.getX() ? first : second;
}
Run Code Online (Sandbox Code Playgroud)
问题:我应该将SparkContext变量保持为瞬态吗?
是.没关系.它只封装了(Java)SparkContext,并且上下文在worker上不可用,因此标记它transient只是告诉Serializer不要序列化该字段.
您也可以让自己的SparkContext包装器不可序列化并将其标记为瞬态 - 与上述效果相同.(顺便说一下,鉴于SparkContext是spark上下文的Scala类名,我选择了另一个名称来避免混乱.)
还有一件事:正如你所指出的,Spark试图序列化完整的封闭类的原因是因为在闭包中使用了类的方法.避免这样!使用匿名类或自包含的闭包(最后将转换为匿名类).
| 归档时间: |
|
| 查看次数: |
3589 次 |
| 最近记录: |