Aki*_*i K 3 java serialization transient apache-spark
我一直在试验Apache Spark试图解决一些查询,如top-k,skyline等.
我做了一个包装SparkConf
和JavaSparkContext
命名的包装器SparkContext
.这个类也实现了serializable,但是既然SparkConf
又JavaSparkContext
不可序列化,那么类也不是.
我有一个类解决名为topK的查询TopK
,该类实现了serializable,但该类也有一个SparkContext
不可序列化的成员变量(由于上述原因).因此,每当我尝试TopK
从.reduce()
RDD中的函数内执行方法时,我都会收到异常.
我发现的解决方案是SparkContext
短暂的.
我的问题是:我应该将SparkContext
变量保持为瞬态还是我犯了一个大错误?
SparkContext
类:
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
public class SparkContext implements Serializable {
private final SparkConf sparConf; // this is not serializable
private final JavaSparkContext sparkContext; // this is not either
protected SparkContext(String appName, String master) {
this.sparConf = new SparkConf();
this.sparConf.setAppName(appName);
this.sparConf.setMaster(master);
this.sparkContext = new JavaSparkContext(sparConf);
}
protected JavaRDD<String> textFile(String path) {
return sparkContext.textFile(path);
}
}
Run Code Online (Sandbox Code Playgroud)
TopK
类:
public class TopK implements QueryCalculator, Serializable {
private final transient SparkContext sparkContext;
.
.
.
}
Run Code Online (Sandbox Code Playgroud)
抛出Task not serializable
异常的示例. getBiggestPointByXDimension
甚至不会输入,因为为了在reduce函数中执行它,包含it(TopK
)的类必须是可序列化的.
private Point findMedianPoint(JavaRDD<Point> points) {
Point biggestPointByXDimension = points.reduce((a, b) -> getBiggestPointByXDimension(a, b));
.
.
.
}
private Point getBiggestPointByXDimension(Point first, Point second) {
return first.getX() > second.getX() ? first : second;
}
Run Code Online (Sandbox Code Playgroud)
问题:我应该将SparkContext变量保持为瞬态吗?
是.没关系.它只封装了(Java)SparkContext,并且上下文在worker上不可用,因此标记它transient
只是告诉Serializer不要序列化该字段.
您也可以让自己的SparkContext
包装器不可序列化并将其标记为瞬态 - 与上述效果相同.(顺便说一下,鉴于SparkContext是spark上下文的Scala类名,我选择了另一个名称来避免混乱.)
还有一件事:正如你所指出的,Spark试图序列化完整的封闭类的原因是因为在闭包中使用了类的方法.避免这样!使用匿名类或自包含的闭包(最后将转换为匿名类).
归档时间: |
|
查看次数: |
3589 次 |
最近记录: |