如何配置 Spark 在 join 或 groupby 后调整输出分区的数量?

Rin*_*dov 9 apache-spark apache-spark-sql pyspark databricks delta-lake

我知道你可以设置spark.sql.shuffle.partitionsspark.sql.adaptive.advisoryPartitionSizeInBytes。前者不适用于自适应查询执行,而后者由于某种原因仅适用于第一次洗牌,之后它仅使用默认的分区数量,即#cores。

有没有办法配置AQE来调整分区数量,使每个分区不超过100MB?

M_S*_*M_S 2

不确定您正在使用哪个版本的 Spark,但您可以尝试将spark.sql.adaptive.coalescePartitions.minPartitionNum 设置为某个值,一开始您可以尝试使用与 sql.shuffle.partitions 相同的值

我希望通过此设置,您将同时拥有小分区的自动合并+aqe处理倾斜,但是当需要做很多事情时,它将尝试保留spark.sql.adaptive.coalescePartitions.minPartitionNum中的最小分区数

目前我没有看到任何其他方法来强制 Spark 动态计算它以保持分区例如不大于 100 MB

为什么我认为它可能会改变一些事情:

以下是该参数的说明:

  val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
    buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
      .internal()
      .doc("(deprecated) The suggested (not guaranteed) minimum number of shuffle partitions " +
        "after coalescing. If not set, the default value is the default parallelism of the " +
        "Spark cluster. This configuration only has an effect when " +
        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
        s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
      .version("3.0.0")
      .intConf
      .checkValue(_ > 0, "The minimum number of partitions must be positive.")
      .createOptional
Run Code Online (Sandbox Code Playgroud)

所以它是可选的,现在让我们检查一下它在Spark 代码中的使用情况:

// Ideally, this rule should simply coalesce partitions w.r.t. the target size specified by
// ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this
// rule by default tries to maximize the parallelism and set the target size to
// `total shuffle size / Spark default parallelism`. In case the `Spark default parallelism`
// is too big, this rule also respect the minimum partition size specified by
// COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
// For history reason, this rule also need to support the config
// COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config in the future.
val minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse {
  if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) {
    // We fall back to Spark default parallelism if the minimum number of coalesced partitions
    // is not set, so to avoid perf regressions compared to no coalescing.
    session.sparkContext.defaultParallelism
  } else {
    // If we don't need to maximize the parallelism, we set `minPartitionNum` to 1, so that
    // the specified advisory partition size will be respected.
    1
  }
}
Run Code Online (Sandbox Code Playgroud)

看起来,当未设置此参数且spark.sql.adaptive.coalescePartitions.parallelismFirst 设置为true(默认情况下为true)时,Spark 将选择默认并行度作为minPartitionNum。正如您所提到的,这可能就是您看到分区数量等于核心数量的原因

如果我理解正确的话,如果您设置spark.sql.adaptive.coalescePartitions.minPartitionNum,它应该可以解决问题并允许您对分区有更多的控制。

如果它没有帮助或者您期望其他东西,您可以尝试尝试其他 sql.adaptive 参数并检查它们在源代码中的使用方式。

我认为这篇博文可能是一个很好的起点