use*_*253 7 java lambda serialization apache-spark
我试图在spark任务中使用lambda表达式,并抛出"java.lang.IllegalArgumentException:无效的lambda反序列化"异常.当代码如"transform(pRDD-> pRDD.map(t-> t._2))"时抛出此异常.代码片段如下.
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer>)t->t._2));
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>> & Serializable)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer> & Serializable)t->t._2));
Run Code Online (Sandbox Code Playgroud)
以上两个选项没有奏效.好像我把对象"f"作为参数传递而不是lambda表达式"t-> t_.2".有用.
Function f = new Function<Tuple2<String,Integer>,Integer>(){
@Override
public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
return paramT1._2;
}
};
Run Code Online (Sandbox Code Playgroud)
我可以知道将该函数表示为lambda表达式的正确格式是什么.
public static void main(String[] args) {
Function f = new Function<Tuple2<String,Integer>,Integer>(){
@Override
public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
return paramT1._2;
}
};
JavaStreamingContext ssc = JavaStreamingFactory.getInstance();
JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(s->{return Arrays.asList(s.split(" "));});
JavaPairDStream<String,Integer> pairRDD = words.mapToPair(x->new Tuple2<String,Integer>(x,1));
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer>)t->t._2));
//JavaDStream<Integer> con = aggregate.transform(pRDD-> pRDD.map(f)); It works
con.print();
ssc.start();
ssc.awaitTermination();
}
Run Code Online (Sandbox Code Playgroud)
我不知道为什么 lambda 不起作用。也许问题在于 lambda 嵌套在 lambda 中。Spark 文档似乎认可了这一点。
对比http://spark.apache.org/docs/latest/programming-guide.html#basics中的示例:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
Run Code Online (Sandbox Code Playgroud)
使用http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation的示例:
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
...
}
});
Run Code Online (Sandbox Code Playgroud)
第二个示例使用Function子类而不是 lambda,大概是因为与您发现的问题相同。
我不知道这对你是否有用,但嵌套 lambda 肯定可以在 Scala 中工作。考虑上一个示例的 Scala 版本:
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
Run Code Online (Sandbox Code Playgroud)