Mit*_*ril 1 apache-spark pyspark
注意:这不是一个询问合并和重新分区之间区别的问题,有很多问题谈论这个,我的不同。
我有一份 pysaprk 工作
df = spark.read.parquet(input_path)
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
...
return pdf
df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)
df1 = df1.withColumnRenamed('y', 'yhat')
print('Partition number: %s' % df.rdd.getNumPartitions())
df1.write.parquet(output_path, mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
默认200个分区需要大内存,所以我将重新分区更改为1000。
由于输出只有44M,我尝试使用coalesce以避免太多的小文件拖慢hdfs。我所做的只是.coalesce(20)在之前添加.write.parquet(output_path, mode='overwrite'):
df = spark.read.parquet(input_path)
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
...
return pdf
df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)
df1 = df1.withColumnRenamed('y', 'yhat')
print('Partition number: %s' % df.rdd.getNumPartitions()) # 1000 here
df1.coalesce(20).write.parquet(output_path, mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
然后spark webui显示:
看起来只有 20 个任务正在运行。
当 repartion(1000) 时,并行度取决于我的 vcore 数量,这里是 36。我可以直观地跟踪进度(进度条大小为 1000 )。在 colonesce(20) 后,之前的 repartion(1000) 失去了功能,并行度下降到 20 ,也失去了直觉。并且添加coalesce(20)会导致整个作业卡住并在没有通知的情况下失败。
更改coalesce(20)为repartition(20)有效,但根据文档,coalesce(20)效率更高,不应导致此类问题。
我想要更高的并行度,并且只有结果合并到 20 。正确的做法是什么?
coalesceSpark 优化器将其视为窄转换,因此它将创建从 groupby 到输出的单个 WholeStageCodegen 阶段,从而将并行度限制为 20。
repartition是一个广泛的转换(即强制洗牌),当您使用它而不是coalesceif 添加一个新的输出阶段但保留 groupby-train 并行性时。
repartition(20)在您的用例中是一个非常合理的选择(洗牌很小,因此成本相当低)。
另一种选择是明确阻止 Spark 优化器合并预测和输出阶段,例如在合并之前使用cache或:persist
# Your groupby code here
from pyspark.storagelevel import StorageLevel
df1.persist(StorageLevel.MEMORY_ONLY)\
.coalesce(20)\
.write.parquet(output_path, mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
考虑到输出大小较小,MEMORY_ONLY persist +合并应该比重新分区更快,但是当输出大小增加时,这并不成立
| 归档时间: |
|
| 查看次数: |
1669 次 |
| 最近记录: |