相关疑难解决方法(0)

当DF包含太多列时,Spark UDF会在每条记录中多次调用

我正在使用Spark 1.6.1并遇到一个奇怪的行为:我在包含一些输入数据的数据帧上运行一个带有一些繁重计算(物理模拟)的UDF,并构建一个包含许多列的结果-Dataframe(~40 ).

奇怪的是,在这种情况下,我的输入数据帧的每个记录不止一次调用我的UDF(经常是1.6倍),我发现这是不可接受的,因为它非常昂贵.如果我减少列数(例如减少到20),则此行为将消失.

我设法写下一个小脚本,演示了这个:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf


object Demo {

  case class Result(a: Double)

  def main(args: Array[String]): Unit = {

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]"))
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val numRuns = sc.accumulator(0) // to count the number of udf calls

    val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)})

    val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id")

    // get results of UDF
    var results = data
      .withColumn("tmp", myUdf($"id"))
      .withColumn("result", $"tmp.a")


    // add …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

13
推荐指数
2
解决办法
1999
查看次数

标签 统计

apache-spark ×1

apache-spark-sql ×1

scala ×1