我有代码表明他的目标是从 1.5B 记录中取出 10M 最旧的记录。
我尝试用它来做orderBy,但它从未完成,然后我尝试用窗口函数来做,15 分钟后完成。
我理解orderBy每个执行器都会获取部分数据,对其进行排序并将前 10M 传递给最终执行器。因为 10M> 分区大小,我们将所有数据传递给最终执行器,然后需要很长时间才能完成。
我不明白窗口解决方案是如何工作的?在单个执行器开始运行之前,shuffle 中发生了什么?这种洗牌如何帮助单个执行器中的排序更快地工作?我将不胜感激任何有助于理解窗口函数在这种情况下如何在后台工作的帮助。
这是窗口函数的代码:
df = sess.sql("select * from table")
last_update_window = Window.orderBy(f.col("last_update").asc())
df = df.withColumn('row_number', f.row_number().over(last_update_window))
df = df.where(f.col('row_number') <= 1000000000)
Run Code Online (Sandbox Code Playgroud)
这是代码orderBy:
df = sess.sql("select * from table")
df = df.orderBy(f.col('last_update').asc()).limit(100000000)
Run Code Online (Sandbox Code Playgroud)
下面是执行窗口函数时的计划图:
最近,我在运行 PySpark 作业之一时遇到了问题。在分析 Spark UI 中的阶段时,我注意到运行时间最长的阶段需要 1.2 小时才能运行完,而整个流程运行所需的总时间为 2.5 小时。
查看阶段详细信息后,我清楚地发现我面临着严重的数据偏差,导致单个任务运行了整个1.2 小时,而所有其他任务在23 秒内完成。
DAG 显示此阶段涉及窗口函数,它帮助我快速将有问题的区域缩小到几个查询,并找到根本原因 -> 中account使用的列,Window.partitionBy("account")有25% 的空值。我没有兴趣计算空帐户的总和,尽管我确实需要涉及的行进行进一步计算,因此我无法在窗口函数之前将它们过滤掉。
这是我的窗口函数查询:
problematic_account_window = Window.partitionBy("account")
sales_with_account_total_df = sales_df.withColumn("sum_sales_per_account", sum(col("price")).over(problematic_account_window))
Run Code Online (Sandbox Code Playgroud)
所以我们找到了罪魁祸首——我们现在能做什么?我们如何解决倾斜和性能问题?
skew apache-spark apache-spark-sql pyspark spark-window-function