Ala*_*ain 10 performance window-functions apache-spark apache-spark-sql pyspark
我正在开发一个 pyspark 例程来插入配置表中的缺失值。
想象一个从 0 到 50,000 的配置值表。用户指定中间的几个数据点(例如 0、50、100、500、2000、500000),然后我们对余数进行插值。我的解决方案主要遵循这篇博客文章,只是我没有使用任何 UDF。
在对其性能进行故障排除时(大约需要 3 分钟),我发现一个特定的窗口函数占用了所有时间,而我所做的其他所有事情只花费了几秒钟。
这是主要感兴趣的区域 - 我使用窗口函数来填写上一个和下一个用户提供的配置值:
from pyspark.sql import Window, functions as F
# Create partition windows that are required to generate new rows from the ones provided
win_last = Window.partitionBy('PORT_TYPE', 'loss_process').orderBy('rank').rowsBetween(Window.unboundedPreceding, 0)
win_next = Window.partitionBy('PORT_TYPE', 'loss_process').orderBy('rank').rowsBetween(0, Window.unboundedFollowing)
# Join back in the provided config table to populate the "known" scale factors
df_part1 = (df_scale_factors_template
.join(df_users_config, ['PORT_TYPE', 'loss_process', 'rank'], 'leftouter')
# Add computed columns that can lookup the prior config and next config for each missing value
.withColumn('last_rank', F.last( F.col('rank'), ignorenulls=True).over(win_last))
.withColumn('last_sf', F.last( F.col('scale_factor'), ignorenulls=True).over(win_last))
).cache()
debug_log_dataframe(df_part1 , 'df_part1') # Force a .count() and time Part1
df_part2 = (df_part1
.withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
).cache()
debug_log_dataframe(df_part2 , 'df_part2') # Force a .count() and time Part2
df_part3 = (df_part2
# Implements standard linear interpolation: y = y1 + ((y2-y1)/(x2-x1)) * (x-x1)
.withColumn('scale_factor',
F.when(F.col('last_rank')==F.col('next_rank'), F.col('last_sf')) # Handle div/0 case
.otherwise(F.col('last_sf') + ((F.col('next_sf')-F.col('last_sf'))/(F.col('next_rank')-F.col('last_rank'))) * (F.col('rank')-F.col('last_rank'))))
.select('PORT_TYPE', 'loss_process', 'rank', 'scale_factor')
).cache()
debug_log_dataframe(df_part3, 'df_part3', explain: True) # Force a .count() and time Part3
Run Code Online (Sandbox Code Playgroud)
上面的内容曾经是一个链式数据框语句,但我后来将其分为 3 部分,以便我可以隔离花费很长时间的部分。结果是:
Part 1: Generated 8 columns and 300006 rows in 0.65 secondsPart 2: Generated 10 columns and 300006 rows in 189.55 secondsPart 3: Generated 4 columns and 300006 rows in 0.24 secondsfirst()over的时间比over 的Window.unboundedFollowing时间长得多?last()Window.unboundedPreceding一些避免问题/疑虑的注意事项:
debug_log_dataframe只是一个辅助函数,用于强制数据帧执行/缓存 a.Count()并计时以产生上述日志。cache()通过在后续运行计时之前明确地排除了重用的影响unpersist()- 我对上述测量非常有信心。物理计划:为了帮助回答这个问题,我调用explain()第 3 部分的结果来确认缓存是否达到了预期的效果等。这里进行了注释以突出显示问题区域:

我能看到的唯一区别是:
last) show RunningWindowFunction,而调用nextjust readWindow我尝试过的一些事情:
first语句占用总时间的一半(约 98 秒)first都是慢速数据帧。我觉得我已经做了尽可能多的挖掘,并且希望 Spark 专家能够看一看,知道这个时间是从哪里来的。
没有回答问题的解决方案
在尝试各种方法来加快我的日常工作时,我突然想到尝试重新重写我的用法,使其仅是具有相反排序顺序first()的用法。last()
所以重写这个:
win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
.orderBy('rank').rowsBetween(0, Window.unboundedFollowing))
df_part2 = (df_part1
.withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
)
Run Code Online (Sandbox Code Playgroud)
像这样:
win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
.orderBy(F.desc('rank')).rowsBetween(Window.unboundedPreceding, 0))
df_part2 = (df_part1
.withColumn('next_rank', F.last(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.last(F.col('scale_factor'), ignorenulls=True).over(win_next))
)
Run Code Online (Sandbox Code Playgroud)
令我惊讶的是,这实际上解决了性能问题,现在整个数据帧只需 3 秒即可生成。我很高兴,但还是很烦恼。
正如我所预测的那样,查询计划现在在创建接下来的两列之前包括一个新的排序步骤,并且它们已从前两列更改Window为 RunningWindowFunction前两列。这是新计划(不再将代码分解为 3 个独立的缓存部分,因为这只是为了解决性能问题):

至于问题:
为什么我通过 Window.unboundedFollowing 调用first() 比通过Window.unboundedPreceding 调用last() 花费的时间长得多?
出于学术原因,我希望有人仍然可以回答这个问题
| 归档时间: |
|
| 查看次数: |
1844 次 |
| 最近记录: |