Rap*_*oth 5 apache-spark apache-spark-sql
spark如何确定使用后的分区数orderBy?我一直认为生成的数据框有spark.sql.shuffle.partitions,但这似乎不是真的:
val df = (1 to 10000).map(i => ("a",i)).toDF("n","i").repartition(10).cache
df.orderBy($"i").rdd.getNumPartitions // = 200 (=spark.sql.shuffle.partitions)
df.orderBy($"n").rdd.getNumPartitions // = 2
Run Code Online (Sandbox Code Playgroud)
在这两种情况下,spark 都可以+- Exchange rangepartitioning(i/n ASC NULLS FIRST, 200),那么第二种情况下的分区数怎么会是 2?
spark.sql.shuffle.partitions用作上限。最终的分区数是1 <= partitions <= spark.sql.shuffle.partition。
正如您所提到的,Spark 中的排序通过RangePartitioner. 它试图实现的是将您的数据集划分为指定数量 ( spark.sql.shuffle.partition) 的大致相等的范围。
保证分区后相同的值将位于同一分区中。值得检查RangePartitioning(不是公共 API 的一部分)类文档:
...
中表达式
ordering计算为相同值的所有行都将位于同一分区中
如果不同排序值的数量小于所需的分区数量,即可能范围的数量小于spark.sql.shuffle.partition,您最终会得到较少数量的分区。另外,这是RangePartitionerScaladoc的引用:
RangePartitioner 创建的实际分区数可能与 partitions 参数不同,在采样记录数小于 partitions 值的情况下。
回到您的示例,n是一个常量 ( "a") 并且无法分区。另一方面,i可以有 10,000 个可能的值并被划分为 200 ( =spark.sql.shuffle.partition) 个范围或分区。
请注意,这仅适用于 DataFrame/Dataset API。使用 RDD 时,sortByKey可以明确指定分区数,否则 Spark 将使用当前分区数。
也可以看看:
| 归档时间: |
|
| 查看次数: |
1956 次 |
| 最近记录: |