Spark NotSerializableException

use*_*691 5 java hadoop apache-spark

在我的Spark代码中,我试图从csv文件创建一个IndexedRowMatrix.但是,我收到以下错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
Run Code Online (Sandbox Code Playgroud)

这是我的代码:

sc = new JavaSparkContext("local", "App",
              "/srv/spark", new String[]{"target/App.jar"});

JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache();


JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
              new  Function<scala.Tuple2<String, Long>, IndexedRow>() {
                /**
                 * 
                **/ 
                private static final long serialVersionUID = 4795273163954440089L;

                @Override
                public IndexedRow call(Tuple2<String, Long> tuple)
                        throws Exception {
                    String line = tuple._1;
                    long index = tuple._2;
                    String[] strings = line.split(",");
                    double[] doubles = new double[strings.length];
                     for (int i = 0; i < strings.length; i++) {
                         doubles[i] = Double.parseDouble(strings[i]);
                     }
                     Vector v = new DenseVector(doubles);
                     return new IndexedRow(index, v);
                }
            });
Run Code Online (Sandbox Code Playgroud)

gga*_*zor 2

有些东西闻起来很可疑,如果您向我们展示更多代码,也许我们可以给出更好的答案。

无论如何,您可以尝试在代表映射器函数的单独文件中创建一个公共类:

public class Mapper implements Function<Tuple2<String,Long>, IndexedRow> {

  @Override
  public IndexedRow call(Tuple2<String, Long> tuple) throws Exception {
    String line = tuple._1();
    long index = tuple._2();
    String[] strings = line.split(",");
    double[] doubles = new double[strings.length];
    for (int i = 0; i < strings.length; i++) {
      doubles[i] = Double.parseDouble(strings[i]);
    }
    Vector v = new DenseVector(doubles);
    return new IndexedRow(index, v);
  }
}
Run Code Online (Sandbox Code Playgroud)

然后用它来映射你的 JavaRDD:

JavaRDD<String> csv = jsc.textFile("data/matrix.csv").cache();
JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(new Mapper());
Run Code Online (Sandbox Code Playgroud)

这样,对于 map() 调用,Spark 只需要序列化 ​​Mapper 类,其中不包含任何不可序列化的属性。

然而,该作业可能会由于我们无法知道的其他原因而失败,因为我们看不到所涉及的所有代码。