Apache Spark:不执行联合操作

Ani*_*nil 5 java apache-spark

我知道火花做懒惰的评价.

但这是预期的行为?使用以下程序,输出为20.

但如果是打印声明

  System.out.println("/////////////////// After "+MainRDD.count());
Run Code Online (Sandbox Code Playgroud)

如果没有注释,输出将为40

我不是在我的应用程序中这样做,但只是为了演示,我创建了这个程序..

 SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JavaSparkSQL");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaRDD<Integer> MainRDD;
ArrayList<Integer> list = new ArrayList<>();
JavaRDD<Integer> tmp;
for (int i = 0; i < 20; i++) {
    list.add(i);
}

MainRDD = sc.parallelize(list);// MainRDD.union(tmp);
System.out.println("//////////////////////First "+MainRDD.count());

list.clear();
for (int i = 20; i < 25; i++) {
    for (int j = 1; j < 5; j++) {
        list.add(i*j);
    }
    tmp = sc.parallelize(list);

  //      System.out.println("/////////////////// Before "+MainRDD.count());
    MainRDD = MainRDD.union(tmp);
//        System.out.println("/////////////////// After "+MainRDD.count());
    list.clear();
}

System.out.println("/////////////////// last "+MainRDD.count());
}
Run Code Online (Sandbox Code Playgroud)

zer*_*323 2

问题的根源在于用于填充 RDD 的可变数据结构。当您调用它时sc.parallelize(list),它不会捕获ArrayList. 由于您clear在实际评估数据时输出循环时调用,因此根本没有数据。

说实话,我不知道为什么当你调用count方法时这种行为会发生变化。由于 RDD 没有缓存,我的猜测是 Spark 或 JVM 内部结构的问题,但我什至不会尝试猜测那里到底发生了什么。也许更聪明的人能够知道这种行为的确切原因。

只是为了说明发生了什么:

val arr = Array(1, 2, 3)

val rdd = sc.parallelize(arr)

(0 until 3).foreach(arr(_) = 99)
val tmp = sc.parallelize(arr)

tmp.union(rdd).collect
// Array[Int] = Array(99, 99, 99, 99, 99, 99) 
Run Code Online (Sandbox Code Playgroud)

val arr = Array(1, 2, 3)

val rdd = sc.parallelize(arr)
rdd.count()
// Long = 3

(0 until 3).foreach(arr(_) = 99)
val tmp = sc.parallelize(arr)

tmp.union(rdd).collect
// Array[Int] = Array(99, 99, 99, 1, 2, 3)

sc.getRDDStorageInfo
// Array[org.apache.spark.storage.RDDInfo] = Array()
Run Code Online (Sandbox Code Playgroud)