为什么我的shuffle分区在group by操作时不是200(默认)?(火花2.4.5)

ani*_*711 5 amazon-emr apache-spark apache-spark-sql pyspark

我对 Spark 很陌生,并试图了解它的内部结构。因此,我从 s3 读取一个 50MB 的小 parquet 文件并执行分组,然后保存回 s3。当我观察 Spark UI 时,我可以看到为此创建了 3 个阶段,

阶段 0:加载(1 个任务)

第 1 阶段:用于分组的 shufflequerystage(12 个任务)

第 2 阶段:保存(coalescedshufflereader)(26 个任务)

代码示例:

df = spark.read.format("parquet").load(src_loc)
df_agg = df.groupby(grp_attribute)\                             
 .agg(F.sum("no_of_launches").alias("no_of_launchesGroup")
df_agg.write.mode("overwrite").parquet(target_loc)
Run Code Online (Sandbox Code Playgroud)

我使用带有 1 个主节点、3 个核心节点(每个节点有 4 个 vcore)的 EMR 实例。因此,默认并行度为 12。我不会在运行时更改任何配置。但我不明白为什么最后阶段会创建26个任务?据我了解,默认情况下,随机分区应为 200。附加 UI 的屏幕截图。

在此输入图像描述

the*_*tom 4

我在 Databricks 上使用 Spark 2.4.5 尝试了类似的逻辑。

我观察到spark.conf.set('spark.sql.adaptive.enabled', 'true'),我的分区的最终数量是 2。

我观察到,使用spark.conf.set('spark.sql.adaptive.enabled', 'false')spark.conf.set('spark.sql.shuffle.partitions', 75),我的分区的最终数量是 75。

使用print(df_agg.rdd.getNumPartitions())揭示了这一点。

因此,Spark UI 上的作业输出并未反映这一点。可能最后会发生重新分区。有趣,但并不是真正的问题。