cri*_*aru 5 apache-spark apache-spark-sql apache-spark-mllib
我有一个镶木地板数据框,具有以下结构:
我必须将 480 个特征列中的每一列替换为其相应的加权移动平均值,窗口为 250。最初,我尝试使用以下简单代码对单个列执行此操作:
var data = sparkSession.read.parquet("s3://data-location")
var window = Window.rowsBetween(-250, Window.currentRow - 1).partitionBy("ID").orderBy("DATE")
data.withColumn("Feature_1", col("Feature_1").divide(avg("Feature_1").over(window))).write.parquet("s3://data-out")
Run Code Online (Sandbox Code Playgroud)
输入数据包含 2000 万行,每个 ID 大约有 4-5000 个关联日期。我已在 AWS EMR 集群(m4.xlarge 实例)上运行此程序,其中一列的结果如下:
我调整了以下设置,希望能缩短总时间:
第二个有助于防止日志中出现的一些溢出,但对实际性能没有任何帮助。
我不明白为什么仅仅 2000 万条记录就需要这么长时间。我知道,为了计算加权移动平均值,它需要执行 20 MX 250(窗口大小)平均值和除法,但对于 16 个核心(第一次运行),我不明白为什么会花费这么长时间。我无法想象剩下的479个特征栏需要多长时间!
我还尝试通过设置来增加默认的随机播放分区:
但即使有 1000 个分区,时间也没有减少。在调用窗口聚合之前还尝试按 ID 和 DATE 对数据进行排序,但没有任何好处。
有什么方法可以改善这个问题,或者窗口函数在我的用例中通常运行缓慢?这只是 20M 行,远不及 Spark 处理其他类型工作负载的能力。
小智 0
您的数据集大小约为 70 GB。如果我对每个 id 的理解正确的话,它会按日期对所有记录进行排序,然后取前面 250 条记录的平均值。由于您需要将其应用于 400 多个列,因此我建议在创建镶木地板时尝试分桶以避免洗牌。编写分桶 parquet 文件需要花费大量时间,但对于所有 480 列的推导,可能不需要 8 分钟 *480 执行时间。
请在创建镶木地板文件时尝试分桶或重新分区和排序,并让我知道它是否有效。