pat*_*ski 4 apache-spark databricks
有人可以解释一下 Apache Spark 中的 openCostInBytes 吗?我可以在文档中看到定义,但我不明白它到底如何影响读取文件。我真的应该关心这个吗?如果是,我应该如何调整它?
spark.files.openCostInBytes会影响输入数据将被读入的分区数量。确切的计算可以在Filepartition.scala中找到。
在回答这个问题时它的存在方式,计算如下:
def maxSplitBytes(
sparkSession: SparkSession,
selectedPartitions: Seq[PartitionDirectory]): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
.getOrElse(sparkSession.leafNodeDefaultParallelism)
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / minPartitionNum
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}
Run Code Online (Sandbox Code Playgroud)
所以最后一行是有趣的。我们取最小值:
defaultMaxSplitBytes,来自spark.sql.files.maxPartitionBytes且默认为 128 * 1024 * 1024openCostInBytes,来自spark.sql.files.openCostInBytes,默认为 4 * 1024bytesPerCore这是totalBytes / minPartitionNum. minPartitionNum来自spark.default.parallelism默认情况,这等于您的核心总数现在我们知道了这一点,我们可以尝试理解此计算的 3 种边缘情况(考虑到参数的默认值):
defaultMaxSplitBytes,这是因为我们的 abytesPerCore大于其他值。仅当我们处理大文件时才会发生这种情况。如此之大,如果我们将数据公平地分配给所有核心,它将比defaultMaxSplitBytes. 所以这里我们限制每个分区的大小。bytesPerCore,则表示小于 128MB 但大于 4MB。在这种情况下,我们将数据公平地分配到所有核心上。openCostInBytes,则表示bytesPerCore非常小,小于 4MB。由于每个分区都有打开成本,因此我们希望限制创建的分区数量。所以在这种情况下,我们限制创建的分区数量通过理解这一点,我们发现这个值仅在您的集群中的数据较小时才有效(即如果您的集群data size / nr of cores in cluster很小)
希望这可以帮助!
| 归档时间: |
|
| 查看次数: |
1716 次 |
| 最近记录: |