evy*_*miz 1 skew apache-spark apache-spark-sql pyspark spark-window-function
最近,我在运行 PySpark 作业之一时遇到了问题。在分析 Spark UI 中的阶段时,我注意到运行时间最长的阶段需要 1.2 小时才能运行完,而整个流程运行所需的总时间为 2.5 小时。
查看阶段详细信息后,我清楚地发现我面临着严重的数据偏差,导致单个任务运行了整个1.2 小时,而所有其他任务在23 秒内完成。
DAG 显示此阶段涉及窗口函数,它帮助我快速将有问题的区域缩小到几个查询,并找到根本原因 -> 中account使用的列,Window.partitionBy("account")有25% 的空值。我没有兴趣计算空帐户的总和,尽管我确实需要涉及的行进行进一步计算,因此我无法在窗口函数之前将它们过滤掉。
这是我的窗口函数查询:
problematic_account_window = Window.partitionBy("account")
sales_with_account_total_df = sales_df.withColumn("sum_sales_per_account", sum(col("price")).over(problematic_account_window))
Run Code Online (Sandbox Code Playgroud)
所以我们找到了罪魁祸首——我们现在能做什么?我们如何解决倾斜和性能问题?
对于这个问题我们基本上有两种解决方案:
解决方案一:
account_window = Window.partitionBy("account")
# split to null and non null
non_null_accounts_df = sales_df.where(col("account").isNotNull())
only_null_accounts_df = sales_df.where(col("account").isNull())
# calculate the sum for the non null
sales_with_non_null_accounts_df = non_null_accounts_df.withColumn("sum_sales_per_account", sum(col("price")).over(account_window)
# union the calculated result and the non null df to the final result
sales_with_account_total_df = sales_with_non_null_accounts_df.unionByName(only_null_accounts_df, allowMissingColumns=True)
Run Code Online (Sandbox Code Playgroud)
解决方案2:
SPARK_SHUFFLE_PARTITIONS = spark.conf.get("spark.sql.shuffle.partitions")
modified_sales_df = (sales_df
# create a random partition value that spans as much as number of shuffle partitions
.withColumn("random_salt_partition", lit(ceil(rand() * SPARK_SHUFFLE_PARTITIONS)))
# use the random partition values only in case the account value is null
.withColumn("salted_account", coalesce(col("account"), col("random_salt_partition")))
)
# modify the partition to use the salted account
salted_account_window = Window.partitionBy("salted_account")
# use the salted account window to calculate the sum of sales
sales_with_account_total_df = sales_df.withColumn("sum_sales_per_account", sum(col("price")).over(salted_account_window))
Run Code Online (Sandbox Code Playgroud)
在我的解决方案中,我决定使用解决方案 2,因为它没有强迫我为了计算而创建更多数据帧,结果如下:
如上所示,加盐技术有助于解决偏度。完全相同的赛段现在总共运行 5.5 分钟,而不是 1.2 小时。代码中唯一的修改是partitionBy. 显示的比较基于完全相同的集群/节点数量/集群配置。
| 归档时间: |
|
| 查看次数: |
828 次 |
| 最近记录: |