Spark:如何获取写入的行数?

mga*_*ido 8 apache-spark

我想知道是否有办法知道Spark保存操作写入的行数.我知道在编写之前对RDD进行计数就足够了,但是我想知道是否有办法在没有这样做的情况下获得相同的信息.

谢谢你,马可

zer*_*323 9

如果您真的想要,可以添加自定义侦听器并从中提取写入行数outputMetrics.非常简单的示例可能如下所示:

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

var recordsWrittenCount = 0L

sc.addSparkListener(new SparkListener() { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten 
    }
  }
})

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
// Long = 10
Run Code Online (Sandbox Code Playgroud)

但这部分API仅供内部使用.

  • 使用“spark.write.avro(...)”写入记录时有没有办法做同样的事情 (2认同)

sam*_*est 7

接受的答案更符合OP的特定需求(如各种评论中明确指出的那样),但这个答案适合大多数人.

最有效的方法是使用累加器:http://spark.apache.org/docs/latest/programming-guide.html#accumulators

val accum = sc.accumulator(0L)

data.map { x =>
  accum += 1
  x
}
.saveAsTextFile(path)

val count = accum.value
Run Code Online (Sandbox Code Playgroud)

然后你可以将它包装在一个有用的皮条客中:

implicit class PimpedStringRDD(rdd: RDD[String]) {
  def saveAsTextFileAndCount(p: String): Long = {
    val accum = rdd.sparkContext.accumulator(0L)

    rdd.map { x =>
      accum += 1
      x
    }
    .saveAsTextFile(p)

    accum.value
  }
}
Run Code Online (Sandbox Code Playgroud)

所以你可以做到

val count = data.saveAsTextFileAndCount(path)
Run Code Online (Sandbox Code Playgroud)

  • 我知道这种方法,但我想避免它出于两个主要原因:在转换中使用它意味着在某些失败的情况下结果不可信; 无论如何都有(小)开销.我只是想知道是否有一个计数器可访问,就像mapreduce一样,因为在web UI中显示的行数... (3认同)