use*_*691 5 java hadoop apache-spark
在我的Spark代码中,我试图从csv文件创建一个IndexedRowMatrix.但是,我收到以下错误:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
Run Code Online (Sandbox Code Playgroud)
这是我的代码:
sc = new JavaSparkContext("local", "App",
"/srv/spark", new String[]{"target/App.jar"});
JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache();
JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
new Function<scala.Tuple2<String, Long>, IndexedRow>() {
/**
*
**/
private static final long serialVersionUID = 4795273163954440089L;
@Override
public IndexedRow call(Tuple2<String, Long> tuple)
throws Exception {
String line = tuple._1;
long index = tuple._2;
String[] strings = line.split(",");
double[] doubles = new double[strings.length];
for (int i = 0; i < strings.length; i++) {
doubles[i] = Double.parseDouble(strings[i]);
}
Vector v = new DenseVector(doubles);
return new IndexedRow(index, v);
}
});
Run Code Online (Sandbox Code Playgroud)
有些东西闻起来很可疑,如果您向我们展示更多代码,也许我们可以给出更好的答案。
无论如何,您可以尝试在代表映射器函数的单独文件中创建一个公共类:
public class Mapper implements Function<Tuple2<String,Long>, IndexedRow> {
@Override
public IndexedRow call(Tuple2<String, Long> tuple) throws Exception {
String line = tuple._1();
long index = tuple._2();
String[] strings = line.split(",");
double[] doubles = new double[strings.length];
for (int i = 0; i < strings.length; i++) {
doubles[i] = Double.parseDouble(strings[i]);
}
Vector v = new DenseVector(doubles);
return new IndexedRow(index, v);
}
}
Run Code Online (Sandbox Code Playgroud)
然后用它来映射你的 JavaRDD:
JavaRDD<String> csv = jsc.textFile("data/matrix.csv").cache();
JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(new Mapper());
Run Code Online (Sandbox Code Playgroud)
这样,对于 map() 调用,Spark 只需要序列化 Mapper 类,其中不包含任何不可序列化的属性。
然而,该作业可能会由于我们无法知道的其他原因而失败,因为我们看不到所涉及的所有代码。
归档时间: |
|
查看次数: |
2061 次 |
最近记录: |