Suk*_*aar 2 scala apache-spark
我正在创建一个自定义 spark 侦听器并将其添加到正在运行的 sparkContext 中,但是即使在这样做之后,如果我正在执行 DataFrame 操作,在从OutputMetrics获取记录时也会遇到问题。
当我在下面运行时(非 DataFrame):
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
var recordsWrittenCount = 0L
spark.sparkContext.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten
}
}
})
spark.sparkContext.parallelize(1 to 10, 2).saveAsTextFile(outputPath)
println("Records Written: " + recordsWrittenCount)
Run Code Online (Sandbox Code Playgroud)
然后我就可以得到记录
但是当我在下面运行时:
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
var recordsWrittenCount = 0L
spark.sparkContext.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten
}
}
})
import spark.implicits._
val someDF = Seq(
(8, "bat"),
(64, "mouse"),
(-27, "horse")
).toDF("number", "word")
someDF.write.save(outputPath)
println("Records Written: " + recordsWrittenCount)
Run Code Online (Sandbox Code Playgroud)
我得到0 条记录
任何人都可以,请让我知道为什么会这样!!
PS:使用Apache Spark 2.2
我想通了,因为它是 Apache Spark 2.2 中的已知错误,后来在 Apache Spark 2.3 中解决了
https://issues.apache.org/jira/browse/SPARK-22605
| 归档时间: |
|
| 查看次数: |
504 次 |
| 最近记录: |