Eda*_*ame 17 scala println accumulator apache-spark
我有以下代码:
val blueCount = sc.accumulator[Long](0)
val output = input.map { data =>
for (value <- data.getValues()) {
if (record.getEnum() == DataEnum.BLUE) {
blueCount += 1
println("Enum = BLUE : " + value.toString()
}
}
data
}.persist(StorageLevel.MEMORY_ONLY_SER)
output.saveAsTextFile("myOutput")
Run Code Online (Sandbox Code Playgroud)
然后blueCount不为零,但我没有println()输出!我在这里错过了什么吗?谢谢!
Alb*_*nto 18
这是一个概念性问题......
想象一下,你有一个庞大的集群,由许多工人组成,让我们说n工人和那些工人存储一个RDD或者的分区DataFrame,想象你map在这些数据中开始一个任务,并且在里面map你有一个print声明,首先:
这些问题太多了,因此设计师/维护者在apache-spark逻辑上决定放弃对print任何map-reduce操作(包括accumulators甚至broadcast变量)内部语句的任何支持.
这也很有意义,因为Spark是一种专为非常大的数据集而设计的语言.虽然打印对测试和调试很有用,但您不希望打印DataFrame或RDD的每一行,因为它们构建为具有数百万或数十亿行!那么,当你甚至不想首先打印时,为什么要处理这些复杂的问题呢?
为了证明这一点,您可以运行此scala代码,例如:
// Let's create a simple RDD
val rdd = sc.parallelize(1 to 10000)
def printStuff(x:Int):Int = {
println(x)
x + 1
}
// It doesn't print anything! because of a logic design limitation!
rdd.map(printStuff)
// But you can print the RDD by doing the following:
rdd.take(10).foreach(println)
Run Code Online (Sandbox Code Playgroud)