Spark 窗口函数零偏斜

evy*_*miz 1 skew apache-spark apache-spark-sql pyspark spark-window-function

最近,我在运行 PySpark 作业之一时遇到了问题。在分析 Spark UI 中的阶段时,我注意到运行时间最长的阶段需要 1.2 小时才能运行完,而整个流程运行所需的总时间为 2.5 小时。

SparkUI 阶段选项卡按最长持续时间排序

查看阶段详细信息后,我清楚地发现我面临着严重的数据偏差,导致单个任务运行了整个1.2 小时,而所有其他任务在23 秒内完成。

任务分配显示出非常明显的偏差

总结显示了最长的任务与绝大多数人之间的巨大差异

DAG 显示此阶段涉及窗口函数,它帮助我快速将有问题的区域缩小到几个查询,并找到根本原因 -> 中account使用的列,Window.partitionBy("account")25% 的空值。我没有兴趣计算空帐户的总和,尽管我确实需要涉及的行进行进一步计算,因此我无法在窗口函数之前将它们过滤掉。

这是我的窗口函数查询:

problematic_account_window = Window.partitionBy("account")

sales_with_account_total_df = sales_df.withColumn("sum_sales_per_account", sum(col("price")).over(problematic_account_window))
Run Code Online (Sandbox Code Playgroud)

所以我们找到了罪魁祸首——我们现在能做什么?我们如何解决倾斜和性能问题?

evy*_*miz 5

对于这个问题我们基本上有两种解决方案:

  1. 将初始数据帧分解为 2 个不同的数据帧,第一个数据帧过滤掉空值并计算总和,第二个数据帧仅包含空值且不参与计算。最后我们将两者结合在一起。
  2. 对空值应用加盐技术,以便将空值分布在所有分区上并为阶段提供稳定性。

解决方案一:

account_window = Window.partitionBy("account")

# split to null and non null
non_null_accounts_df = sales_df.where(col("account").isNotNull())
only_null_accounts_df = sales_df.where(col("account").isNull())

# calculate the sum for the non null
sales_with_non_null_accounts_df = non_null_accounts_df.withColumn("sum_sales_per_account", sum(col("price")).over(account_window)

# union the calculated result and the non null df to the final result
sales_with_account_total_df = sales_with_non_null_accounts_df.unionByName(only_null_accounts_df, allowMissingColumns=True)
Run Code Online (Sandbox Code Playgroud)

解决方案2:

SPARK_SHUFFLE_PARTITIONS = spark.conf.get("spark.sql.shuffle.partitions")

modified_sales_df = (sales_df
    # create a random partition value that spans as much as number of shuffle partitions
    .withColumn("random_salt_partition", lit(ceil(rand() * SPARK_SHUFFLE_PARTITIONS)))
    # use the random partition values only in case the account value is null
    .withColumn("salted_account", coalesce(col("account"), col("random_salt_partition")))
    )

# modify the partition to use the salted account
salted_account_window = Window.partitionBy("salted_account")

# use the salted account window to calculate the sum of sales
sales_with_account_total_df = sales_df.withColumn("sum_sales_per_account", sum(col("price")).over(salted_account_window))

Run Code Online (Sandbox Code Playgroud)

在我的解决方案中,我决定使用解决方案 2,因为它没有强迫我为了计算而创建更多数据帧,结果如下:

任务现在相当均匀

任务的最长持续时间现在为 1.2 分钟,而不是 1.2 小时

如上所示,加盐技术有助于解决偏度。完全相同的赛段现在总共运行 5.5 分钟,而不是 1.2 小时。代码中唯一的修改是partitionBy. 显示的比较基于完全相同的集群/节点数量/集群配置。