Spark coalesce(20) 覆盖 repartition(1000).groupby(xxx).apply(func) 的并行度

Mit*_*ril 1 apache-spark pyspark

注意:这不是一个询问合并和重新分区之间区别的问题,有很多问题谈论这个,我的不同。

我有一份 pysaprk 工作

df = spark.read.parquet(input_path)

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    ...
    return pdf

df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

df1 = df1.withColumnRenamed('y', 'yhat')

print('Partition number: %s' % df.rdd.getNumPartitions())

df1.write.parquet(output_path, mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

默认200个分区需要大内存,所以我将重新分区更改为1000。

Spark WebUI 上的作业详细信息如下所示: 在此输入图像描述

由于输出只有44M,我尝试使用coalesce以避免太多的小文件拖慢hdfs。我所做的只是.coalesce(20)在之前添加.write.parquet(output_path, mode='overwrite')

df = spark.read.parquet(input_path)

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    ...
    return pdf

df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

df1 = df1.withColumnRenamed('y', 'yhat')

print('Partition number: %s' % df.rdd.getNumPartitions())  # 1000 here

df1.coalesce(20).write.parquet(output_path, mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

然后spark webui显示:

在此输入图像描述

看起来只有 20 个任务正在运行。

当 repartion(1000) 时,并行度取决于我的 vcore 数量,这里是 36。我可以直观地跟踪进度(进度条大小为 1000 )。在 colonesce(20) 后,之前的 repartion(1000) 失去了功能,并行度下降到 20 ,也失去了直觉。并且添加coalesce(20)会导致整个作业卡住并在没有通知的情况下失败。

更改coalesce(20)repartition(20)有效,但根据文档,coalesce(20)效率更高,不应导致此类问题。

我想要更高的并行度,并且只有结果合并到 20 。正确的做法是什么?

rlu*_*uta 6

coalesceSpark 优化器将其视为窄转换,因此它将创建从 groupby 到输出的单个 WholeStageCodegen 阶段,从而将并行度限制为 20。

repartition是一个广泛的转换(即强制洗牌),当您使用它而不是coalesceif 添加一个新的输出阶段但保留 groupby-train 并行性时。

repartition(20)在您的用例中是一个非常合理的选择(洗牌很小,因此成本相当低)。

另一种选择是明确阻止 Spark 优化器合并预测和输出阶段,例如在合并之前使用cache或:persist

# Your groupby code here

from pyspark.storagelevel import StorageLevel

df1.persist(StorageLevel.MEMORY_ONLY)\
   .coalesce(20)\
   .write.parquet(output_path, mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

考虑到输出大小较小,MEMORY_ONLY persist +合并应该比重新分区更快,但是当输出大小增加时,这并不成立

  • AFAIK,DataFrame/Dataset 上的“coalesce”方法一直很狭窄。你可以查看[源代码](https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala# L2286): `coalesce` 使用 Repartition 运算符,并显式禁用 shuffle,而 `repartition` 启用了 shuffle。[RDD 实现](https://github.com/apache/spark/blob/branch-2.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala# L417) (3认同)