hir*_*ryu 10 apache-spark spark-dataframe
我试图找到一种可靠的方法来以编程方式计算Spark数据帧的大小(以字节为单位).
原因是我希望有一种方法来计算"最佳"分区数量("最佳"可能意味着不同的东西:它可能意味着 具有最佳分区大小,或者在写入Parquet时产生最佳文件大小表 - 但两者都可以假设为数据帧大小的某些线性函数.换句话说,我想调用coalesce(n)或repartition(n)在数据帧上,其中n不是固定数字,而是数据帧大小的函数.
关于SO的其他主题建议使用SizeEstimator.estimatefrom org.apache.spark.util来获取数据帧的字节大小,但我得到的结果是不一致的.
首先,我将数据帧保存到内存中:
df.cache().count
Run Code Online (Sandbox Code Playgroud)
Spark UI在"存储"选项卡中显示大小为4.8GB.然后,我运行以下命令来获取大小SizeEstimator:
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
Run Code Online (Sandbox Code Playgroud)
这给出了115'715'808字节= ~116MB的结果.但是,应用于SizeEstimator不同的对象会导致非常不同的结果.例如,我尝试分别为数据帧中的每一行计算大小并将它们相加:
df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)
这导致12'084'698'256字节= ~12GB的大小.或者,我可以尝试应用于SizeEstimator每个分区:
df.mapPartitions(
iterator => Seq(SizeEstimator.estimate(
iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)
这又导致10'792'965'376字节的不同大小=〜10.8GB.
我知道存在内存优化/内存开销,但在执行这些测试之后,我没有看到如何SizeEstimator使用它来获得足够好的数据帧大小估计(以及因此分区大小或结果Parquet文件大小).
SizeEstimator为了获得对数据帧大小或其分区的良好估计,应用适当的方法(如果有的话)是什么?如果没有,这里建议的方法是什么?
不幸的是,我无法从中获得可靠的估计SizeEstimator,但我可以找到另一种策略 - 如果数据帧被缓存,我们可以从queryExecution如下提取其大小:
df.cache.foreach(_=>_)
val catalyst_plan = df.queryExecution.logical
val df_size_in_bytes = spark.sessionState.executePlan(
catalyst_plan).optimizedPlan.stats.sizeInBytes
Run Code Online (Sandbox Code Playgroud)
对于示例数据帧,这恰好提供了4.8GB(这也与写入未压缩的Parquet表时的文件大小相对应).
这样做的缺点是需要缓存数据帧,但在我的情况下它不是问题.
SizeEstimator返回对象在 JVM 堆上占用的字节数。这包括对象引用的对象,实际对象大小几乎总是小得多。
您观察到的大小差异是因为当您在 JVM 上创建新对象时,引用也会占用内存,而这会被计算在内。
在此处查看文档
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.util.SizeEstimator $
除了您已经尝试过的大小估计器(很好的洞察力)。
下面是另一种选择
RDDInfo[] getRDDStorageInfo()
Run Code Online (Sandbox Code Playgroud)
返回有关缓存哪些 RDD 的信息,如果它们在mem 中或在两者中,它们占用了多少空间等。
实际上火花存储选项卡使用这个。Spark 文档
/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
getRDDStorageInfo(_ => true)
}
private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = {
assertNotStopped()
val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray
rddInfos.foreach { rddInfo =>
val rddId = rddInfo.id
val rddStorageInfo = statusStore.asOption(statusStore.rdd(rddId))
rddInfo.numCachedPartitions = rddStorageInfo.map(_.numCachedPartitions).getOrElse(0)
rddInfo.memSize = rddStorageInfo.map(_.memoryUsed).getOrElse(0L)
rddInfo.diskSize = rddStorageInfo.map(_.diskUsed).getOrElse(0L)
}
rddInfos.filter(_.isCached)
}
Run Code Online (Sandbox Code Playgroud)
yourRDD.toDebugString从 RDD 也使用这个。代码在这里
在我看来,要在每个分区中获得最佳记录数并检查您的重新分区是否正确并且它们是否均匀分布,我建议尝试如下...并调整您的重新分区数。然后测量分区的大小......会更明智。解决此类问题
yourdf.rdd.mapPartitionsWithIndex{case (index,rows) => Iterator((index,rows.size))}
.toDF("PartitionNumber","NumberOfRecordsPerPartition")
.show
Run Code Online (Sandbox Code Playgroud)
或现有的火花功能(基于火花版本)
import org.apache.spark.sql.functions._
df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
11965 次 |
| 最近记录: |