从 spark udf 记录到驱动程序

tes*_*acc 5 scala apache-spark databricks

我在 spark 中使用的数据块中有一个简单的 UDF。我不能使用 println 或 log4j 或其他东西,因为它会被输出到执行中,我需要在驱动程序中使用它。我有一个非常系统的日志设置

var logMessage = ""

def log(msg: String){
  logMessage += msg + "\n"
}

def writeLog(file: String){
  println("start write")
  println(logMessage)
  println("end write")
}

def warning(msg: String){
  log("*WARNING* " + msg)
}

val CleanText = (s: int) => {
  log("I am in this UDF")
  s+2
}

sqlContext.udf.register("CleanText", CleanText)
Run Code Online (Sandbox Code Playgroud)

我怎样才能让它正常运行并登录到驱动程序?

Tza*_*har 5

Apache Spark 中最接近您要执行的操作的机制是accumulators。您可以在执行程序上累积日志行并在驱动程序中访问结果:

// create a collection accumulator using the spark context:
val logLines: CollectionAccumulator[String] = sc.collectionAccumulator("log")

// log function adds a line to accumulator
def log(msg: String): Unit = logLines.add(msg)

// driver-side function can print the log using accumulator's *value*
def writeLog() {
  import scala.collection.JavaConverters._
  println("start write")
  logLines.value.asScala.foreach(println)
  println("end write")
}

val CleanText = udf((s: Int) => {
  log(s"I am in this UDF, got: $s")
  s+2
})

// use UDF in some transformation:
Seq(1, 2).toDF("a").select(CleanText($"a")).show()

writeLog()    
// prints: 
// start write
// I am in this UDF, got: 1
// I am in this UDF, got: 2
// end write
Run Code Online (Sandbox Code Playgroud)

但是:这并不是真正推荐的,尤其是不用于日志目的。如果你登录每条记录,这个累加器最终会让你的驱动程序崩溃OutOfMemoryError或者只是让你的速度变慢。

由于您使用的是 Databricks,我会检查它们支持日志聚合的选项,或者只是使用 Spark UI 查看执行程序日志。