编写以下java程序来试验apache spark.
程序尝试从相应的文件中读取正面和负面单词列表,将其与主文件进行比较并相应地过滤结果.
import java.io.Serializable;
import java.io.FileNotFoundException;
import java.io.File;
import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.List;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
public class SimpleApp implements Serializable{
public static void main(String[] args) {
String logFile = "/tmp/master.txt"; // Should be some file on your system
String positive = "/tmp/positive.txt"; // Should be some file on your system
String negative = "/tmp/negative.txt"; // Should be some file on your system
JavaSparkContext sc = new JavaSparkContext("local[4]", "Twitter Analyzer", "/home/welcome/Downloads/spark-1.1.0/", new String[]{"target/scala-2.10/Simple-assembly-0.1.0.jar"});
JavaRDD<String> positiveComments …Run Code Online (Sandbox Code Playgroud) 这是一个有效的代码示例:
JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap);
messages.print();
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
Run Code Online (Sandbox Code Playgroud)
我得到以下错误:
ERROR:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140)
at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46)
Run Code Online (Sandbox Code Playgroud)