Fle*_*eur 0 java serialization apache-spark
我在Spark中得到Task不可序列化的错误.我已经搜索并尝试使用某些帖子中建议的静态函数,但它仍然会出现相同的错误.
代码如下:
public class Rating implements Serializable {
private SparkSession spark;
private SparkConf sparkConf;
private JavaSparkContext jsc;
private static Function<String, Rating> mapFunc;
public Rating() {
mapFunc = new Function<String, Rating>() {
public Rating call(String str) {
return Rating.parseRating(str);
}
};
}
public void runProcedure() {
sparkConf = new SparkConf().setAppName("Filter Example").setMaster("local");
jsc = new JavaSparkContext(sparkConf);
SparkSession spark = SparkSession.builder().master("local").appName("Word Count")
.config("spark.some.config.option", "some-value").getOrCreate();
JavaRDD<Rating> ratingsRDD = spark.read().textFile("sample_movielens_ratings.txt")
.javaRDD()
.map(mapFunc);
}
public static void main(String[] args) {
Rating newRating = new Rating();
newRating.runProcedure();
}
}
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个错误?提前致谢.
ric*_*din 12
显然Rating不能Serializable,因为它包含引用星火结构(即SparkSession,SparkConf等)的属性.
这里的问题在于
JavaRDD<Rating> ratingsRD = spark.read().textFile("sample_movielens_ratings.txt")
.javaRDD()
.map(mapFunc);
Run Code Online (Sandbox Code Playgroud)
如果你看一下定义mapFunc,就会返回一个Rating对象.
mapFunc = new Function<String, Rating>() {
public Rating call(String str) {
return Rating.parseRating(str);
}
};
Run Code Online (Sandbox Code Playgroud)
此函数用于map(Spark术语中的转换)内部.由于转换直接执行到工作节点而不是驱动程序节点,因此它们的代码必须是可序列化的.这迫使Spark尝试序列化Rating类,但这是不可能的.
尝试从中提取所需的功能Rating,并将它们放在不具有任何Spark结构的不同类中.最后,使用这个新类作为mapFunc函数的返回类型.
小智 5
此外,您必须确保类中不包含不可序列化的变量,例如JavaSparkContext和SparkSession。如果您需要包含它们,您应该这样定义:
private transient JavaSparkContext sparkCtx;
private transient SparkSession spark;
Run Code Online (Sandbox Code Playgroud)
祝你好运。
| 归档时间: |
|
| 查看次数: |
5603 次 |
| 最近记录: |