PMe*_*nde 5 apache-spark pyspark
我正在尝试计算 Spark 中的一些移动平均值,但遇到了倾斜分区的问题。这是我尝试执行的简单计算:
# Variables
one_min = 60
one_hour = 60*one_min
one_day = 24*one_hour
seven_days = 7*one_day
thirty_days = 30*one_day
# Column variables
target_col = "target"
partition_col = "partition_col"
df_base = (
spark
.sql("SELECT * FROM {base}".format(base=base_table))
)
df_product1 = (
df_base
.where(F.col("product_id") == F.lit("1"))
.select(
F.col(target_col).astype("double").alias(target_col),
F.unix_timestamp("txn_timestamp").alias("window_time"),
"transaction_id",
partition_col
)
)
df_product1.persist()
Run Code Online (Sandbox Code Playgroud)
window_lengths = {
"1day": one_day,
"7day": seven_days,
"30day": thirty_days
}
# Create window specs for each type
part_windows = {
time: Window.partitionBy(F.col(partition_col))
.orderBy(F.col("window_time").asc())
.rangeBetween(-secs, -one_min)
for (time, secs) in window_lengths.items()
}
cols = [
# Note: not using `avg` as I will be smoothing this at some point
(F.sum(target_col).over(win)/F.count("*").over(win)).alias(
"{time}_avg_target".format(time=time)
)
for time, win in part_windows.items()
]
sample_df = (
df_product1
.repartition(2000, partition_col)
.sortWithinPartitions(F.col("window_time").asc())
.select(
"*",
*cols
)
)
Run Code Online (Sandbox Code Playgroud)
现在,我可以获取collect
这些数据的有限子集(比如仅 100 行),但如果我尝试运行完整查询,并且聚合运行平均值,Spark 就会卡在一些特别大的分区上。绝大多数分区中的记录少于 100 万条。其中只有大约 50 家拥有超过 1M 的记录,只有大约 150 家拥有超过 500K 的记录
然而,少数人拥有超过 250 万条记录(约 10 条),其中 3 个拥有超过 500 万条记录。这些分区已运行超过 12 小时且未能完成。这些分区中的偏差是数据的自然部分,表示分区列的这些不同值中的较大活动。我无法控制该分区列的值的定义。
我使用的是SparkSession
启用了动态分配、32G RAM 和每个执行器 4 个核心,最少 4 个执行器。我尝试将执行器增加到 96G,每个执行器有 8 个核心,最少 10 个执行器,但任务仍然没有完成。
这似乎是一个不需要 13 个小时就能完成的计算。DataFramedf_product1
包含接近 300M 的记录。
如果还有其他信息有助于解决此问题,请在下面评论。
归档时间: |
|
查看次数: |
270 次 |
最近记录: |