tes*_*acc 5 scala apache-spark databricks
我在 spark 中使用的数据块中有一个简单的 UDF。我不能使用 println 或 log4j 或其他东西,因为它会被输出到执行中,我需要在驱动程序中使用它。我有一个非常系统的日志设置
var logMessage = ""
def log(msg: String){
logMessage += msg + "\n"
}
def writeLog(file: String){
println("start write")
println(logMessage)
println("end write")
}
def warning(msg: String){
log("*WARNING* " + msg)
}
val CleanText = (s: int) => {
log("I am in this UDF")
s+2
}
sqlContext.udf.register("CleanText", CleanText)
Run Code Online (Sandbox Code Playgroud)
我怎样才能让它正常运行并登录到驱动程序?
Apache Spark 中最接近您要执行的操作的机制是accumulators。您可以在执行程序上累积日志行并在驱动程序中访问结果:
// create a collection accumulator using the spark context:
val logLines: CollectionAccumulator[String] = sc.collectionAccumulator("log")
// log function adds a line to accumulator
def log(msg: String): Unit = logLines.add(msg)
// driver-side function can print the log using accumulator's *value*
def writeLog() {
import scala.collection.JavaConverters._
println("start write")
logLines.value.asScala.foreach(println)
println("end write")
}
val CleanText = udf((s: Int) => {
log(s"I am in this UDF, got: $s")
s+2
})
// use UDF in some transformation:
Seq(1, 2).toDF("a").select(CleanText($"a")).show()
writeLog()
// prints:
// start write
// I am in this UDF, got: 1
// I am in this UDF, got: 2
// end write
Run Code Online (Sandbox Code Playgroud)
但是:这并不是真正推荐的,尤其是不用于日志目的。如果你登录每条记录,这个累加器最终会让你的驱动程序崩溃OutOfMemoryError或者只是让你的速度变慢。
由于您使用的是 Databricks,我会检查它们支持日志聚合的选项,或者只是使用 Spark UI 查看执行程序日志。
| 归档时间: |
|
| 查看次数: |
4254 次 |
| 最近记录: |