Rap*_*oth 5 scala apache-spark apache-spark-sql
使用Spark 1.6.1,我想调用UDF的次数。我之所以要这样做,是因为我有一个非常昂贵的UDF(每次调用大约1秒),并且我怀疑UDF的调用频率比数据框中的记录数更多,这使我的spark工作变得比必要的慢。
尽管无法重现这种情况,但我想出了一个简单的示例,该示例显示了对UDF的调用次数似乎与行数不同(在这里:更少),那怎么可能呢?
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf
object Demo extends App {
val conf = new SparkConf().setMaster("local[4]").setAppName("Demo")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val callCounter = sc.accumulator(0)
val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value")
println(df.count) // gives 10000
val myudf = udf((d:Int) => {callCounter.add(1);d})
val res = df.withColumn("result",myudf($"value")).cache
println(res.select($"result").collect().size) // gives 10000
println(callCounter.value) // gives 9941
}
Run Code Online (Sandbox Code Playgroud)
如果使用累加器不是调用UDF计数的正确方法,我还能怎么做?
注意:在我的实际Spark-Job中,获得的呼叫计数大约是实际记录数的1.7倍。
Spark 应用程序应该定义 main() 方法,而不是扩展 scala.App。scala.App 的子类可能无法正常工作。
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf
object Demo extends App {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
val sc = new SparkContext(conf)
// [...]
}
}
Run Code Online (Sandbox Code Playgroud)
这应该可以解决你的问题。