sag*_*sag 23 apache-spark apache-spark-sql
我有RDD[Row]
,需要持久保存到第三方存储库.但是这个第三方存储库在一次调用中最多接受5 MB.
所以我想根据RDD中存在的数据大小创建分区,而不是基于RDD中存在的行数.
如何找到a的大小RDD
并根据它创建分区?
sag*_*sag 12
正如Justin和Wang所说,获得RDD的大小并不是直截了当的.我们可以做一个估计.
我们可以对RDD进行采样,然后使用SizeEstimator来获取样本的大小.正如Wang和Justin所提到的,基于离线采样的大小数据,比如,X行使用Y GB离线,Z行在运行时可能需要Z*Y/X GB
以下是获取RDD大小/估计值的示例scala代码.
我是scala和spark的新手.下面的示例可能会以更好的方式编写
def getTotalSize(rdd: RDD[Row]): Long = {
// This can be a parameter
val NO_OF_SAMPLE_ROWS = 10l;
val totalRows = rdd.count();
var totalSize = 0l
if (totalRows > NO_OF_SAMPLE_ROWS) {
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
val sampleRDDSize = getRDDSize(sampleRDD)
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
} else {
// As the RDD is smaller than sample rows count, we can just calculate the total RDD size
totalSize = getRDDSize(rdd)
}
totalSize
}
def getRDDSize(rdd: RDD[Row]) : Long = {
var rddSize = 0l
val rows = rdd.collect()
for (i <- 0 until rows.length) {
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
}
rddSize
}
Run Code Online (Sandbox Code Playgroud)
一个直接的方法是调用以下,取决于您是否要以序列化形式存储数据,然后转到火花UI"存储"页面,您应该能够计算出RDD的总大小(内存+磁盘):
rdd.persist(StorageLevel.MEMORY_AND_DISK)
or
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
Run Code Online (Sandbox Code Playgroud)
在运行时计算准确的内存大小并不容易.您可以尝试在运行时进行估算:基于离线采样的大小数据,例如,X行使用Y GB离线,Z行在运行时可以采用Z*Y/X GB; 这与贾斯汀先前提出的相似.
希望这可以帮助.
我认为RDD.count()将为您提供RDD中的元素数量
归档时间: |
|
查看次数: |
36845 次 |
最近记录: |