排序后的数据帧分区数?

Rap*_*oth 5 apache-spark apache-spark-sql

spark如何确定使用后的分区数orderBy?我一直认为生成的数据框有spark.sql.shuffle.partitions,但这似乎不是真的:

val df = (1 to 10000).map(i => ("a",i)).toDF("n","i").repartition(10).cache

df.orderBy($"i").rdd.getNumPartitions // = 200 (=spark.sql.shuffle.partitions)
df.orderBy($"n").rdd.getNumPartitions // = 2 
Run Code Online (Sandbox Code Playgroud)

在这两种情况下,spark 都可以+- Exchange rangepartitioning(i/n ASC NULLS FIRST, 200),那么第二种情况下的分区数怎么会是 2?

Ser*_*kov 8

spark.sql.shuffle.partitions用作上限。最终的分区数是1 <= partitions <= spark.sql.shuffle.partition


正如您所提到的,Spark 中的排序通过RangePartitioner. 它试图实现的是将您的数据集划分为指定数量 ( spark.sql.shuffle.partition) 的大致相等的范围。

保证分区后相同的值将位于同一分区中。值得检查RangePartitioning(不是公共 API 的一部分)类文档:

...

中表达式ordering计算为相同值的所有行都将位于同一分区中

如果不同排序值的数量小于所需的分区数量,即可能范围的数量小于spark.sql.shuffle.partition,您最终会得到较少数量的分区。另外,这是RangePartitionerScaladoc的引用:

RangePartitioner 创建的实际分区数可能与 partitions 参数不同,在采样记录数小于 partitions 值的情况下。

回到您的示例,n是一个常量 ( "a") 并且无法分区。另一方面,i可以有 10,000 个可能的值并被划分为 200 ( =spark.sql.shuffle.partition) 个范围或分区。

请注意,这仅适用于 DataFrame/Dataset API。使用 RDD 时,sortByKey可以明确指定分区数,否则 Spark 将使用当前分区数。

也可以看看: