Spark - 迭代算法的奇怪行为

Chr*_*ian 4 iteration algorithm apache-spark

我尝试用Spark编写迭代算法.该算法包含一个主循环,其中使用不同的并行性火花命令.如果在每次迭代中只使用一个spark命令,一切正常,但是当使用多个命令时,Spark的行为变得非常奇怪.主要问题是在具有2个项目的rdd上的map命令不会导致2但在许多函数调用中.

它看起来像Spark正在迭代x中执行从迭代1到迭代(x-1)的每个命令.但不仅在循环的最后一次迭代中,而且在循环的每次迭代中!

我构建了一个小例子来重现行为(使用Java 1.8和Spark 1.6.1)

首先是rdd中使用的数据结构:

public class Data implements Serializable {
    private static final long serialVersionUID = -6367920689454127925L;
    private String id;
    private Integer value;

    public Data(final String id, final Integer value) {
        super();
        this.id = id;
        this.value = value;
    }

    public String getId() {
        return this.id;
    }

    public Integer getValue() {
        return this.value;
    }

    public void setValue(final Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Data [id=" + this.id + ", value=" + this.value + "]";
    }
}
Run Code Online (Sandbox Code Playgroud)

对于max命令,我们使用comperator:

public class MyComparator implements java.util.Comparator<Data>, Serializable {

    private static final long serialVersionUID = 1383816444011380318L;

    private static final double EPSILON = 0.001;

    public MyComparator() {
    }

    @Override
    public int compare(final Data x, final Data y) {
        if (Math.abs(x.getValue() - y.getValue()) < EPSILON) {
            return 0;
        } else if (x.getValue() < y.getValue()) {
            return -1;
        } else {
            return 1;
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

现在主要的程序与算法:

public class Job implements Serializable {

    private static final long serialVersionUID = -1828983500553835114L;

    // Spark Settings
    private static final String APPNAME = "DebugApp - Main";
    private static final String SPARKMASTER = "local[1]";
    private static final int MAX_ITERATIONS = 4;

    public Job() {
    }

    public static void main(final String[] args) {
        final Job job = new Job();
        job.run();
    }

    public void run() {
        final JavaSparkContext sparkContext = createSparkContext();
        final List<Data> dataSet = new ArrayList<Data>();
        dataSet.add(new Data("0", 0));
        dataSet.add(new Data("1", 0));

        JavaRDD<Data> dataParallel = sparkContext.parallelize(dataSet);

        // We use an accumulator to count the number of calls within the map command
        final Accumulator<Integer> accum = sparkContext.accumulator(0);

        final MyComparator comparator = new MyComparator();
        for (int iterations = 0; iterations < MAX_ITERATIONS; iterations++) {
            // If the item which should be updated is selected using the iteration counter everything works fine...
            // final String idToUpdate = new Integer(iterations % 2).toString();

            // ..., but if the element with the minimal value is selected the number of executions in the map command increases.
            final String idToUpdate = dataParallel.min(comparator).getId();
            dataParallel = dataParallel.map(data -> {
                accum.add(1); // Counting the number of function calls.
                return updateData(data, idToUpdate);
            });
        }

        final List<Data> resultData = dataParallel.collect();
        System.out.println("Accumulator: " + accum.value());
        for (Data data : resultData) {
            System.out.println(data.toString());
        }
    }

    private Data updateData(final Data data, final String id) {
        if (data.getId().equals(id)) {
            data.setValue(data.getValue() + 1);
        }
        return data;
    }

    private JavaSparkContext createSparkContext() {
        final SparkConf conf = new SparkConf().setAppName(APPNAME).setMaster(SPARKMASTER);
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.set("spark.kryo.registrator", "de.eprofessional.bidmanager2.engine.serialization.KryoRegistratorWrapper");
        return new JavaSparkContext(conf);

    }
}
Run Code Online (Sandbox Code Playgroud)

我希望每次迭代我们都能获得2个函数调用,如果要使用迭代计数器选择要更新的项目,则会出现这种情况(参见累加器结果1).但是如果使用min命令选择元素,我们会得到不同的结果(参见累加器结果2):

MAX_ITERATIONS | 累加器结果1 | 累积结果2
----------------------------------------------- --------------
1 .............. | .......... 2 ......... .. | .......... 2
2 .............. | .......... 4 ......... .. | .......... 6
3 .............. | .......... 6 ......... .. | ......... 12
4 .............. | .......... 8 .......... .| ......... 20

有人对map命令中的其他调用有解释吗?

maa*_*asg 6

RDD上的操作定义了所谓的"血统".每个RDD都有对其父级(或者父级,例如连接)的引用.在实现RDD时访问该谱系.这构成了RDD中弹性的基础:Spark可以通过在给定的数据分区上执行所述沿袭来重新创建数据集上的所有操作以获得结果.

这里发生的事情是我们正在联系.map电话.如果我们展开循环,我们会看到类似的东西:

iter1 -> rdd.map(f)
iter2 -> rdd.map(f).map(f) 
iter3 -> rdd.map(f).map(f).map(f)
...
Run Code Online (Sandbox Code Playgroud)

我们可以通过rdd.toDebugString在循环内发出一个来看到这一点.

因此,底线:每次传递实际上都会为前一阶段添加一个血统步骤.如果我们想破坏那个谱系,我们应该checkpoint在每次迭代时使用RDD来"记住"最后的中间结果.cache具有类似的效果,但不能保证评估停止(如果没有更多的内存要缓存).因此,RDD实现可以进一步评估谱系.