Spark 窗口性能问题

cri*_*aru 5 apache-spark apache-spark-sql apache-spark-mllib

我有一个镶木地板数据框,具有以下结构:

  1. ID字符串
  2. 日期 日期
  3. 480 个 Double 类型的其他特征列

我必须将 480 个特征列中的每一列替换为其相应的加权移动平均值,窗口为 250。最初,我尝试使用以下简单代码对单个列执行此操作:

var data = sparkSession.read.parquet("s3://data-location")
var window = Window.rowsBetween(-250, Window.currentRow - 1).partitionBy("ID").orderBy("DATE")
data.withColumn("Feature_1", col("Feature_1").divide(avg("Feature_1").over(window))).write.parquet("s3://data-out")
Run Code Online (Sandbox Code Playgroud)

输入数据包含 2000 万行,每个 ID 大约有 4-5000 个关联日期。我已在 AWS EMR 集群(m4.xlarge 实例)上运行此程序,其中一列的结果如下:

  • 4 个执行器 X 4 个核心 X 10 GB + 1 GB 用于纱线开销(因此每个任务 2.5 GB,16 个并发运行任务),花费了 14 分钟
  • 8 个执行器 X 4 个核心 X 10GB + 1 GB 纱线开销(因此每个任务 2.5GB,32 个并发运行任务),耗时 8 分钟

我调整了以下设置,希望能缩短总时间:

  • 火花.内存.存储分数 0.02
  • Spark.sql.windowExec.buffer.in.memory.threshold 100000
  • Spark.sql.constraintPropagation.enabled false

第二个有助于防止日志中出现的一些溢出,但对实际性能没有任何帮助。

我不明白为什么仅仅 2000 万条记录就需要这么长时间。我知道,为了计算加权移动平均值,它需要执行 20 MX 250(窗口大小)平均值和除法,但对于 16 个核心(第一次运行),我不明白为什么会花费这么长时间。我无法想象剩下的479个特征栏需要多长时间!

我还尝试通过设置来增加默认的随机播放分区:

  • Spark.sql.shuffle.partitions 1000

但即使有 1000 个分区,时间也没有减少。在调用窗口聚合之前还尝试按 ID 和 DATE 对数据进行排序,但没有任何好处。

有什么方法可以改善这个问题,或者窗口函数在我的用例中通常运行缓慢?这只是 20M 行,远不及 Spark 处理其他类型工作负载的能力。

小智 0

您的数据集大小约为 70 GB。如果我对每个 id 的理解正确的话,它会按日期对所有记录进行排序,然后取前面 250 条记录的平均值。由于您需要将其应用于 400 多个列,因此我建议在创建镶木地板时尝试分桶以避免洗牌。编写分桶 parquet 文件需要花费大量时间,但对于所有 480 列的推导,可能不需要 8 分钟 *480 执行时间。

请在创建镶木地板文件时尝试分桶或重新分区和排序,并让我知道它是否有效。