如何找到RDD的大小

sag*_*sag 23 apache-spark apache-spark-sql

我有RDD[Row],需要持久保存到第三方存储库.但是这个第三方存储库在一次调用中最多接受5 MB.

所以我想根据RDD中存在的数据大小创建分区,而不是基于RDD中存在的行数.

如何找到a的大小RDD并根据它创建分区?

sag*_*sag 12

正如Justin和Wang所说,获得RDD的大小并不是直截了当的.我们可以做一个估计.

我们可以对RDD进行采样,然后使用SizeEstimator来获取样本的大小.正如Wang和Justin所提到的,基于离线采样的大小数据,比如,X行使用Y GB离线,Z行在运行时可能需要Z*Y/X GB

以下是获取RDD大小/估计值的示例scala代码.

我是scala和spark的新手.下面的示例可能会以更好的方式编写

def getTotalSize(rdd: RDD[Row]): Long = {
  // This can be a parameter
  val NO_OF_SAMPLE_ROWS = 10l;
  val totalRows = rdd.count();
  var totalSize = 0l
  if (totalRows > NO_OF_SAMPLE_ROWS) {
    val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
    val sampleRDDSize = getRDDSize(sampleRDD)
    totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
  } else {
    // As the RDD is smaller than sample rows count, we can just calculate the total RDD size
    totalSize = getRDDSize(rdd)
  }

  totalSize
}

def getRDDSize(rdd: RDD[Row]) : Long = {
    var rddSize = 0l
    val rows = rdd.collect()
    for (i <- 0 until rows.length) {
       rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
    }

    rddSize
}
Run Code Online (Sandbox Code Playgroud)

  • @SamuelAlexander`rdd.sample(true,NO_OF_SAMPLE_ROWS)`将返回完整的RDD,第二个参数应该是0到1之间的数字 (2认同)

Hai*_*ang 6

一个直接的方法是调用以下,取决于您是否要以序列化形式存储数据,然后转到火花UI"存储"页面,您应该能够计算出RDD的总大小(内存+磁盘):

rdd.persist(StorageLevel.MEMORY_AND_DISK)

or

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
Run Code Online (Sandbox Code Playgroud)

在运行时计算准确的内存大小并不容易.您可以尝试在运行时进行估算:基于离线采样的大小数据,例如,X行使用Y GB离线,Z行在运行时可以采用Z*Y/X GB; 这与贾斯汀先前提出的相似.

希望这可以帮助.


Yiy*_*ang 5

我认为RDD.count()将为您提供RDD中的元素数量

  • 您好@Yiying,欢迎来到StackOverflow.海报要求RDD的大小,而不仅仅是行数.也许您可以扩展您的答案,以便海报不需要任何进一步的澄清.一旦您有足够的声誉,您就可以根据自己的意愿发表评论. (6认同)
  • 据推测,该问题询问信息单位(字节)的大小。但“计数”也是大小的度量——这个答案并没有真正回答问题,但确实为理想答案添加了信息。 (2认同)