org.apache.spark.SparkException:任务不可序列化

xia*_* li 6 scala apache-kafka apache-spark

这是一个有效的代码示例:

JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap);
messages.print();
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
});
Run Code Online (Sandbox Code Playgroud)

我得到以下错误:

ERROR:
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140)
    at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46)
Run Code Online (Sandbox Code Playgroud)

InP*_*uit 15

由于您使用匿名内部类定义了map函数,因此包含的类也必须是Serializable.将map函数定义为单独的类或使其成为静态内部类.从Java文档(http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):

注 - 由于多种原因,强烈建议不要对内部类(即非静态成员类的嵌套类)(包括本地类和匿名类)进行序列化.因为在非静态上下文中声明的内部类包含对封闭类实例的隐式非瞬态引用,所以序列化这样的内部类实例也将导致其关联的外部类实例的序列化.