我想知道是否有办法知道Spark保存操作写入的行数.我知道在编写之前对RDD进行计数就足够了,但是我想知道是否有办法在没有这样做的情况下获得相同的信息.
谢谢你,马可
如果您真的想要,可以添加自定义侦听器并从中提取写入行数outputMetrics.非常简单的示例可能如下所示:
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
var recordsWrittenCount = 0L
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten
}
}
})
sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
// Long = 10
Run Code Online (Sandbox Code Playgroud)
但这部分API仅供内部使用.
接受的答案更符合OP的特定需求(如各种评论中明确指出的那样),但这个答案适合大多数人.
最有效的方法是使用累加器:http://spark.apache.org/docs/latest/programming-guide.html#accumulators
val accum = sc.accumulator(0L)
data.map { x =>
accum += 1
x
}
.saveAsTextFile(path)
val count = accum.value
Run Code Online (Sandbox Code Playgroud)
然后你可以将它包装在一个有用的皮条客中:
implicit class PimpedStringRDD(rdd: RDD[String]) {
def saveAsTextFileAndCount(p: String): Long = {
val accum = rdd.sparkContext.accumulator(0L)
rdd.map { x =>
accum += 1
x
}
.saveAsTextFile(p)
accum.value
}
}
Run Code Online (Sandbox Code Playgroud)
所以你可以做到
val count = data.saveAsTextFileAndCount(path)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6627 次 |
| 最近记录: |