什么是 openCostInBytes?

pat*_*ski 4 apache-spark databricks

有人可以解释一下 Apache Spark 中的 openCostInBytes 吗?我可以在文档中看到定义,但我不明白它到底如何影响读取文件。我真的应该关心这个吗?如果是,我应该如何调整它?

Koe*_*dlt 7

spark.files.openCostInBytes会影响输入数据将被读入的分区数量。确切的计算可以在Filepartition.scala中找到。

在回答这个问题时它的存在方式,计算如下:

def maxSplitBytes(
    sparkSession: SparkSession,
    selectedPartitions: Seq[PartitionDirectory]): Long = {
  val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
  val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
  val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
    .getOrElse(sparkSession.leafNodeDefaultParallelism)
  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
  val bytesPerCore = totalBytes / minPartitionNum

  Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}
Run Code Online (Sandbox Code Playgroud)

所以最后一行是有趣的。我们取最小值:

  • defaultMaxSplitBytes,来自spark.sql.files.maxPartitionBytes且默认为 128 * 1024 * 1024
  • 最大值:
    • openCostInBytes,来自spark.sql.files.openCostInBytes,默认为 4 * 1024
    • bytesPerCore这是totalBytes / minPartitionNum. minPartitionNum来自spark.default.parallelism默认情况,这等于您的核心总数

现在我们知道了这一点,我们可以尝试理解此计算的 3 种边缘情况(考虑到参数的默认值):

  • 如果结果是 的值defaultMaxSplitBytes,这是因为我们的 abytesPerCore大于其他值。仅当我们处理大文件时才会发生这种情况。如此之大,如果我们将数据公平地分配给所有核心,它将比defaultMaxSplitBytes. 所以这里我们限制每个分区的大小。
  • 如果结果为 的值bytesPerCore,则表示小于 128MB 但大于 4MB。在这种情况下,我们将数据公平地分配到所有核心上。
  • 如果结果是 的值openCostInBytes,则表示bytesPerCore非常小,小于 4MB。由于每个分区都有打开成本,因此我们希望限制创建的分区数量。所以在这种情况下,我们限制创建的分区数量

通过理解这一点,我们发现这个值仅在您的集群中的数据较小时才有效(即如果您的集群data size / nr of cores in cluster很小)

希望这可以帮助!