Rin*_*dov 9 apache-spark apache-spark-sql pyspark databricks delta-lake
我知道你可以设置spark.sql.shuffle.partitions和spark.sql.adaptive.advisoryPartitionSizeInBytes。前者不适用于自适应查询执行,而后者由于某种原因仅适用于第一次洗牌,之后它仅使用默认的分区数量,即#cores。
有没有办法配置AQE来调整分区数量,使每个分区不超过100MB?
不确定您正在使用哪个版本的 Spark,但您可以尝试将spark.sql.adaptive.coalescePartitions.minPartitionNum 设置为某个值,一开始您可以尝试使用与 sql.shuffle.partitions 相同的值
我希望通过此设置,您将同时拥有小分区的自动合并+aqe处理倾斜,但是当需要做很多事情时,它将尝试保留spark.sql.adaptive.coalescePartitions.minPartitionNum中的最小分区数
目前我没有看到任何其他方法来强制 Spark 动态计算它以保持分区例如不大于 100 MB
为什么我认为它可能会改变一些事情:
以下是该参数的说明:
val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
.internal()
.doc("(deprecated) The suggested (not guaranteed) minimum number of shuffle partitions " +
"after coalescing. If not set, the default value is the default parallelism of the " +
"Spark cluster. This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
.version("3.0.0")
.intConf
.checkValue(_ > 0, "The minimum number of partitions must be positive.")
.createOptional
Run Code Online (Sandbox Code Playgroud)
所以它是可选的,现在让我们检查一下它在Spark 代码中的使用情况:
// Ideally, this rule should simply coalesce partitions w.r.t. the target size specified by
// ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this
// rule by default tries to maximize the parallelism and set the target size to
// `total shuffle size / Spark default parallelism`. In case the `Spark default parallelism`
// is too big, this rule also respect the minimum partition size specified by
// COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
// For history reason, this rule also need to support the config
// COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config in the future.
val minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse {
if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) {
// We fall back to Spark default parallelism if the minimum number of coalesced partitions
// is not set, so to avoid perf regressions compared to no coalescing.
session.sparkContext.defaultParallelism
} else {
// If we don't need to maximize the parallelism, we set `minPartitionNum` to 1, so that
// the specified advisory partition size will be respected.
1
}
}
Run Code Online (Sandbox Code Playgroud)
看起来,当未设置此参数且spark.sql.adaptive.coalescePartitions.parallelismFirst 设置为true(默认情况下为true)时,Spark 将选择默认并行度作为minPartitionNum。正如您所提到的,这可能就是您看到分区数量等于核心数量的原因
如果我理解正确的话,如果您设置spark.sql.adaptive.coalescePartitions.minPartitionNum,它应该可以解决问题并允许您对分区有更多的控制。
如果它没有帮助或者您期望其他东西,您可以尝试尝试其他 sql.adaptive 参数并检查它们在源代码中的使用方式。
我认为这篇博文可能是一个很好的起点
| 归档时间: |
|
| 查看次数: |
1103 次 |
| 最近记录: |