Spark数据帧组的平均值和中位数未完成

sta*_*010 6 apache-spark apache-spark-sql spark-dataframe

我使用Spark sql数据帧执行groupby操作,然后计算每个组的数据的平均值和中位数.原始数据量约为1 TB.

val df_result = df.filter($"DayOfWeek" <= 5).groupBy("id").agg(
        count("Error").as("Count"), 
        avg("Error").as("MeanError"), 
        callUDF("percentile_approx", col("Error"), lit(0.05)).as("5thError"), 
        callUDF("percentile_approx", col("Error"), lit(0.5)).as("MedianError"), 
        callUDF("percentile_approx", col("Error"), lit(0.95)).as("95thError")).
    filter($"Count" > 1000)


df_result.orderBy(asc("MeanError")).limit(5000)
    .write.format("csv").option("header", "true").save("/user/foo.bar/result.csv")
Run Code Online (Sandbox Code Playgroud)

当我运行该查询时,我的工作陷入困境并且无法完成.我该如何调试问题?是否存在导致groupby()卡住的关键不平衡?

use*_*459 0

评论中已经有很多明智的建议,但这里值得的是我的想法:

1) df.count 有效吗?如果没有,您的问题出在您发布的代码之前(如评论中建议的那样)

2)查看 Spark UI(如评论中建议的那样)-大多数任务是否快速完成,而少数任务则需要很长时间/出现卡住?如果是这样,倾斜可能是您的问题

3)您可能会重写查询,首先只查找每个“id”的“计数”。接下来过滤原始 df,使其仅包含 id 通过广播(以避免 df 随机播放)内部联接出现超过 1000 次的行(如果没有太多 id 出现超过 1000 次的情况)。然后聚合这个较小的数据框并计算所有统计数据。如果计数聚合有效,输出还应该显示是否存在任何显着的数据偏差!

4)有时,将计算分解为更小的步骤,然后写入然后立即从磁盘读取,这帮助我在过去完成了尴尬的工作。如果首先生成 df 的成本很高,也可以加快调试速度。

5)绝对值得升级spark.sql.shuffle.partitions(如评论中建议的);2001 在 Spark 中是一个神奇的数字(spark.sql.shuffle.partitions 的最佳值应该是多少,或者我们在使用 Spark SQL 时如何增加分区?

6)我也会尝试改变数据量,如果您仅使用一周中的某一天= 1,它是否有效(如评论中所建议的)

7)查询是否在没有percentile_approx的情况下运行?