Ame*_*url 5 partitioning apache-spark
我不太清楚范围分区器在Spark中是如何工作的.它使用(水库采样)取样.我对计算输入边界的方式感到困惑.
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
Run Code Online (Sandbox Code Playgroud)
为什么计算的sampleSize应该乘以3.0?以及如何获得边界?有人能告诉我一些这方面的例子吗?谢谢!
Tim*_*Tim 13
您发布的代码来自用于获取未分区RDD并通过新范围分区程序对其进行分区的方法.这包括三个步骤:
K到分区索引的功能您的问题涉及第一步.理想情况下,您可以只收集所有RDD数据,对其进行排序,并确定将已排序集合划分为nPartitions块的范围界限.简单!
没那么多.该算法在计算中是O(n log n),并且需要与集合成比例的存储器.这些事实(特别是第二个)使得在分布式Spark框架中执行变得不切实际.但是我们不需要我们的分区完全平衡,因为它们将在我可怕的收集和排序实现之后.只要我们的分区最终达到合理平衡,我们就明白了.如果我们可以使用给出近似分位数边界但运行速度更快的算法,这可能是一个胜利.
好的,所以我们有动力拥有一个快速运行并且不占用太多内存的高效算法.水库采样证明是一种很好的方法.如果你的收藏品有1B元素并且你采样1M,你的1M元素的第10个百分位大约等于你的1B的第10个百分位数.您可以使用完全相同的收集和排序算法来确定范围界限,但是在完整数据的减少的随机采样子集上.
第一行(sampleSize)估计充分表示值的真实范围所需的样本数.这有点武断,可能基于反复试验.但是,由于您希望并行采样,因此您需要知道每个分布式采用partition多少值,而不是总体采用多少值.第二行(sampleSizePerPartition)估计这个数字.
之前我提到过我们希望分区大致平衡.这是因为大量Spark函数依赖于此属性 - sampleSizePerPartition包含的代码.我们知道分区大小略有不同,但假设它们变化不大.通过从每个分区中抽取比我们完全平衡所需的值多3倍的值,我们可以容忍更多的分区不平衡.
考虑如果您有100,000个分区会发生什么.在这种情况下,sampleSize是200万(20*分区)
如果从每个分区中获取20个随机元素,那么如果任何分区的元素少于20个,那么最终会得到的样本数少于sampleSize.从每个分区中获取60个元素是积极的,但确保除了最极端的不平衡分区方案之外,您将获得足够的样本.
| 归档时间: |
|
| 查看次数: |
6559 次 |
| 最近记录: |