按行迭代Java RDD

Kat*_*ler 9 java apache-spark rdd

我想迭代一个字符串的RDD并为每个字符串"做一些事情".输出应该是double[][].这是一个带有for循环的例子.我知道我需要使用(我认为)foreachJava RDD 的功能.但是,我不知道如何理解语法.文档不是特别有用.我没有Java 8.

这是一个例子,如果我可以使用常规for循环,我想做什么.

public class PCA {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("PCA Example");
        SparkContext sc = new SparkContext(conf);

        RDD<String> data = sc.textFile("my/directory/my/dataset.txt", 0);

        // here is the "type" of code I would like to execute
        // 30 because I have 30 variables
        double[][] vals = new double[data.count()][30];

        double[] temp;
        for (int i = 0; i < data.count(); i++) {
            temp = splitStringtoDoubles(data[i]);
            vals[i] = temp;
        }
    }

    private static double[] splitStringtoDoubles(String s) {
        String[] splitVals = s.split("\\t");
        Double[] vals = new Double[splitVals.length];
        for (int i = 0; i < splitVals.length; i++) {
            vals[i] = Double.parseDouble(splitVals[i]);
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

我明白这foreach似乎需要一个具有void返回类型的函数.不知道如何使用它.这是我到目前为止所尝试的(显然语法错误):

    double[][] matrix = new double[data.count()][30];
    foreach(String s : data) {
        String[] splitvals = s.split("\\t");
        double[] vals = Double.parseDouble(splitvals);
        matrix[s] = vals; 
    }
Run Code Online (Sandbox Code Playgroud)

Bal*_*duz 4

正如mattinbits在评论中所说,您需要 amap而不是 a foreach,因为您想要返回值。amap基本上所做的就是转换数据:对于 RDD 的每一行,执行一项操作并为每一行返回一个值。你所需要的可以这样实现:

import org.apache.spark.api.java.function.Function;

...

SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);

JavaRDD<String> data = sc.textFile("clean-sl-mix-with-labels.txt",0).toJavaRDD();
JavaRDD<double[]> whatYouWantRdd = data.map(new Function<String, double[]>() {
    @Override
    public double[] call(String row) throws Exception {
        return splitStringtoDoubles(row);
    }

    private double[] splitStringtoDoubles(String s) {
        String[] splitVals = s.split("\\t");
        Double[] vals = new Double[splitVals.length];
        for(int i=0; i < splitVals.length; i++) {
            vals[i] = Double.parseDouble(splitVals[i]);
        }
        return vals;
    }
});

List<double[]> whatYouWant = whatYouWantRdd.collect();
Run Code Online (Sandbox Code Playgroud)

为了了解 Spark 的工作原理,您可以在 RDD 上执行操作或转换。例如,这里我们使用map函数来转换 RDD。您需要自己创建这个函数,这次使用匿名函数org.apache.spark.api.java.function.Function,这会强制您重写方法call,在该方法中您接收 RDD 的一行并返回一个值。