解决 PySpark 窗口函数中倾斜分区的性能问题

PMe*_*nde 5 apache-spark pyspark

我正在尝试计算 Spark 中的一些移动平均值,但遇到了倾斜分区的问题。这是我尝试执行的简单计算:

获取基础数据

# Variables
one_min = 60
one_hour = 60*one_min
one_day = 24*one_hour
seven_days = 7*one_day
thirty_days = 30*one_day

# Column variables
target_col = "target"
partition_col = "partition_col"


df_base = (
    spark
    .sql("SELECT * FROM {base}".format(base=base_table))
)

df_product1 = (
    df_base
    .where(F.col("product_id") == F.lit("1"))
    .select(
        F.col(target_col).astype("double").alias(target_col),
        F.unix_timestamp("txn_timestamp").alias("window_time"),
        "transaction_id",
        partition_col
    )
)
df_product1.persist()
Run Code Online (Sandbox Code Playgroud)

计算运行平均值

window_lengths = {
    "1day": one_day,
    "7day": seven_days,
    "30day": thirty_days
}
# Create window specs for each type
part_windows = {
    time: Window.partitionBy(F.col(partition_col))
                .orderBy(F.col("window_time").asc())
                .rangeBetween(-secs, -one_min)
    for (time, secs) in window_lengths.items()
}

cols = [
    # Note: not using `avg` as I will be smoothing this at some point
    (F.sum(target_col).over(win)/F.count("*").over(win)).alias(
        "{time}_avg_target".format(time=time)
    )
    for time, win in part_windows.items()
]

sample_df = (
    df_product1
    .repartition(2000, partition_col)
    .sortWithinPartitions(F.col("window_time").asc())
    .select(
        "*",
        *cols
    )
)
Run Code Online (Sandbox Code Playgroud)

现在,我可以获取collect这些数据的有限子集(比如仅 100 行),但如果我尝试运行完整查询,并且聚合运行平均值,Spark 就会卡在一些特别大的分区上。绝大多数分区中的记录少于 100 万条其中只有大约 50 家拥有超过 1M 的记录,只有大约 150 家拥有超过 500K 的记录

然而,少数人拥有超过 250 万条记录(约 10 条),其中 3 个拥有超过 500 万条记录。这些分区已运行超过 12 小时且未能完成。这些分区中的偏差是数据的自然部分,表示分区列的这些不同值中的较大活动。我无法控制该分区列的值的定义。

我使用的是SparkSession启用了动态分配、32G RAM 和每个执行器 4 个核心,最少 4 个执行器。我尝试将执行器增加到 96G,每个执行器有 8 个核心,最少 10 个执行器,但任务仍然没有完成。

这似乎是一个不需要 13 个小时就能完成的计算。DataFramedf_product1包含接近 300M 的记录。

如果还有其他信息有助于解决此问题,请在下面评论。