ash*_*der 1 java serialization task apache-spark
我目前刚刚开始学习 Apache Spark,并且有一些代码我不太明白为什么无法编译。它说我发送到 myRDD forEach 的任务不可序列化,但是我正在观看的教程也做了类似的事情。任何想法或线索将不胜感激。
public class Main {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<Integer> inputData = new ArrayList<>();
inputData.add(25);
SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> myRDD = sc.parallelize(inputData);
Integer result = myRDD.reduce((x, y) -> x + y);
myRDD.foreach( System.out::println );
System.out.println(result);
sc.close();
}
}
Run Code Online (Sandbox Code Playgroud)
堆栈跟踪:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable...
at com.virtualpairprogrammers.Main.main(Main.java:26)
Caused by: java.io.NotSerializableException: java.io.PrintStream
Serialization stack:
- object not serializable (class: java.io.PrintStream, value: java.io.PrintStream@11a82d0f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)...
Run Code Online (Sandbox Code Playgroud)
不要使用 Lambda 引用。它将尝试将println(..)的功能传递PrintStream给执行者。请记住,您传递或放入 Spark 闭包(在 Map/Filter/Reduce 等内部)的所有方法都必须序列化。由于println(..)是 的一部分PrintStream,因此该类PrintStream必须被序列化。
传递一个匿名函数,如下所示-
myRDD.foreach(integer -> System.out.println(integer));
Run Code Online (Sandbox Code Playgroud)
Full Example
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
public class Test63321956 {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<Integer> inputData = new ArrayList<>();
inputData.add(25);
SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> myRDD = sc.parallelize(inputData);
Integer result = myRDD.reduce(Integer::sum);
myRDD.collect().forEach( System.out::println );
myRDD.foreach(integer -> System.out.println(integer));
System.out.println(result);
/**
* 25
* 25
* 25
*/
sc.close();
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
496 次 |
| 最近记录: |