Yul*_*Sh. 4 java mapreduce apache-spark
有人能举例说明在Java中正确使用mapPartitionsWithIndex吗?我发现了很多Scala示例,但缺少Java.我的理解是正确的,当使用此函数时,单独的节点将处理单独的分区.
我收到以下错误
method mapPartitionsWithIndex in class JavaRDD<T> cannot be applied to given types;
JavaRDD<String> rdd = sc.textFile(filename).mapPartitionsWithIndex
required: Function2<Integer,Iterator<String>,Iterator<R>>,boolean
found: <anonymous Function2<Integer,Iterator<String>,Iterator<JavaRDD<String>>>>
Run Code Online (Sandbox Code Playgroud)
做的时候
JavaRDD<String> rdd = sc.textFile(filename).mapPartitionsWithIndex(
new Function2<Integer, Iterator<String>, Iterator<JavaRDD<String>> >() {
@Override
public Iterator<JavaRDD<String>> call(Integer ind, String s) {
Run Code Online (Sandbox Code Playgroud)
这是我用来删除csv文件第一行的代码:
JavaRDD<String> rawInputRdd = sparkContext.textFile(dataFile);
Function2 removeHeader= new Function2<Integer, Iterator<String>, Iterator<String>>(){
@Override
public Iterator<String> call(Integer ind, Iterator<String> iterator) throws Exception {
if(ind==0 && iterator.hasNext()){
iterator.next();
return iterator;
}else
return iterator;
}
};
JavaRDD<String> inputRdd = rawInputRdd.mapPartitionsWithIndex(removeHeader, false);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5229 次 |
| 最近记录: |