sta*_*010 6 apache-spark apache-spark-sql spark-dataframe
我使用Spark sql数据帧执行groupby操作,然后计算每个组的数据的平均值和中位数.原始数据量约为1 TB.
val df_result = df.filter($"DayOfWeek" <= 5).groupBy("id").agg(
count("Error").as("Count"),
avg("Error").as("MeanError"),
callUDF("percentile_approx", col("Error"), lit(0.05)).as("5thError"),
callUDF("percentile_approx", col("Error"), lit(0.5)).as("MedianError"),
callUDF("percentile_approx", col("Error"), lit(0.95)).as("95thError")).
filter($"Count" > 1000)
df_result.orderBy(asc("MeanError")).limit(5000)
.write.format("csv").option("header", "true").save("/user/foo.bar/result.csv")
Run Code Online (Sandbox Code Playgroud)
当我运行该查询时,我的工作陷入困境并且无法完成.我该如何调试问题?是否存在导致groupby()卡住的关键不平衡?
评论中已经有很多明智的建议,但这里值得的是我的想法:
1) df.count 有效吗?如果没有,您的问题出在您发布的代码之前(如评论中建议的那样)
2)查看 Spark UI(如评论中建议的那样)-大多数任务是否快速完成,而少数任务则需要很长时间/出现卡住?如果是这样,倾斜可能是您的问题
3)您可能会重写查询,首先只查找每个“id”的“计数”。接下来过滤原始 df,使其仅包含 id 通过广播(以避免 df 随机播放)内部联接出现超过 1000 次的行(如果没有太多 id 出现超过 1000 次的情况)。然后聚合这个较小的数据框并计算所有统计数据。如果计数聚合有效,输出还应该显示是否存在任何显着的数据偏差!
4)有时,将计算分解为更小的步骤,然后写入然后立即从磁盘读取,这帮助我在过去完成了尴尬的工作。如果首先生成 df 的成本很高,也可以加快调试速度。
5)绝对值得升级spark.sql.shuffle.partitions(如评论中建议的);2001 在 Spark 中是一个神奇的数字(spark.sql.shuffle.partitions 的最佳值应该是多少,或者我们在使用 Spark SQL 时如何增加分区?)
6)我也会尝试改变数据量,如果您仅使用一周中的某一天= 1,它是否有效(如评论中所建议的)
7)查询是否在没有percentile_approx的情况下运行?
| 归档时间: |
|
| 查看次数: |
567 次 |
| 最近记录: |