在 Spark 上排名()的有效方法?

csh*_*in9 5 hadoop apache-spark pyspark

我在 PySpark 上有一个三列数据框,我试图在其上执行RANK() OVER(PARTITION BY ... ORDER BY ...)与 SQL 相同的操作。数据框df看起来像:

col1, col2, score
A   , B   , 0.500
...
Run Code Online (Sandbox Code Playgroud)

我知道我可以为此使用窗口函数:

from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window.partitionBy(df['col1']).orderBy(df['score'].desc())
df = df.select('col1', 'col2', 'score', F.rank().over(windowSpec).alias('rnk'))
Run Code Online (Sandbox Code Playgroud)

对于非常大的df,这需要花费大量时间跨分区整理数据。有什么办法可以提高效率吗?(如果没有必要,我不需要使用数据框。)

jav*_*dba 4

添加@DanieldePaula 的评论;在 Spark SQL 中,避免洗牌是很棘手的(/通常不可能)。preservesPartioning=true我已经与 Spark sql 的守护者 Michael Armbrust 讨论过这个问题 - 其基本原理是,如果允许在 Spark sql 查询中使用(来自许多核心操作)的等效项,非专家用户最终可能会得到不正确的结果RDD

在任何情况下,对于您的查询来说,在没有随机播放的情况下获得结果都特别困难 - 因为您的数据甚至还没有正确预分区。

如果您希望获得更好的性能,您需要:

  • 完全退出spark sql
  • 根据所需的窗口对数据进行预分区
  • 通过核心原语手动执行相当于窗口操作的操作RDD

现在,这些听起来有点非常繁重……所以……您可能只想忍受窗口中的性能。