尝试使用 System.out 作为 RDD 中的任务

ash*_*der 1 java serialization task apache-spark

我目前刚刚开始学习 Apache Spark,并且有一些代码我不太明白为什么无法编译。它说我发送到 myRDD forEach 的任务不可序列化,但是我正在观看的教程也做了类似的事情。任何想法或线索将不胜感激。

public class Main {
    public static void main(String[] args) {
        Logger.getLogger("org.apache").setLevel(Level.WARN);
        List<Integer> inputData = new ArrayList<>();

        inputData.add(25);


        SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> myRDD = sc.parallelize(inputData);
        Integer result = myRDD.reduce((x, y) -> x + y);

        myRDD.foreach( System.out::println );
        System.out.println(result);

        sc.close();

    }
}
Run Code Online (Sandbox Code Playgroud)

堆栈跟踪:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable...
    at com.virtualpairprogrammers.Main.main(Main.java:26)
Caused by: java.io.NotSerializableException: java.io.PrintStream
Serialization stack:
    - object not serializable (class: java.io.PrintStream, value: java.io.PrintStream@11a82d0f)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)...
Run Code Online (Sandbox Code Playgroud)

Som*_*Som 5

不要使用 Lambda 引用。它将尝试将println(..)的功能传递PrintStream给执行者。请记住,您传递或放入 Spark 闭包(在 Map/Filter/Reduce 等内部)的所有方法都必须序列化。由于println(..)是 的一部分PrintStream,因此该类PrintStream必须被序列化。

传递一个匿名函数,如下所示-

myRDD.foreach(integer -> System.out.println(integer));
Run Code Online (Sandbox Code Playgroud)

Full Example


import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

public class Test63321956 {
    public static void main(String[] args) {
        Logger.getLogger("org.apache").setLevel(Level.WARN);
        List<Integer> inputData = new ArrayList<>();

        inputData.add(25);


        SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> myRDD = sc.parallelize(inputData);
        Integer result = myRDD.reduce(Integer::sum);

        myRDD.collect().forEach( System.out::println );
        myRDD.foreach(integer -> System.out.println(integer));
        System.out.println(result);
        /**
         * 25
         * 25
         * 25
         */

        sc.close();

    }
}

Run Code Online (Sandbox Code Playgroud)