限制 Spark 上下文中的记录数量

Uri*_*ren 2 python hadoop apache-spark rdd pyspark

我想减少每个减速器的记录数量,并将结果变量保留为rdd

使用takeSample似乎是显而易见的选择,但是,它返回一个collection而不是一个SparkContext对象。

我想出了这个方法:

rdd = rdd.zipWithIndex().filter(lambda x:x[1]<limit).map(lambda x:x[0])
Run Code Online (Sandbox Code Playgroud)

然而,这种方法非常慢并且效率不高。

有没有更聪明的方法来获取小样本并保持数据结构rdd

zer*_*323 5

如果您想要一个小的示例子集并且不能对数据做出任何额外的假设,那么take与之结合parallelize可以是最佳解决方案:

sc.parallelize(rdd.take(n))
Run Code Online (Sandbox Code Playgroud)

它将涉及相对较少数量的分区(在最好的情况下只有一个),并且小n的网络流量成本应该可以忽略不计。

采样(randomSplit或)将需要与sample相同的完整数据扫描。zipWithIndexfilter

假设没有数据倾斜,您可以尝试这样的方法来解决这个问题:

from __future__ import division  # Python 2 only

def limitApprox(rdd, n, timeout):
    count = rdd.countApprox(timeout)
    if count <= n:
        return rdd
    else:
        rec_per_part = count // rdd.getNumPartitions()
        required_parts = n / rec_per_part if rec_per_part else 1
        return rdd.mapPartitionsWithIndex(
            lambda i, iter: iter if i < required_parts else []
        )
Run Code Online (Sandbox Code Playgroud)
  • 这仍然会访问每个分区,但如果没有必要,会尝试避免计算内容
  • 如果存在较大的数据偏差,则不起作用
    • 如果分布均匀但 n << 每个分区的平均记录数,则可能需要比所需的多得多的数据。
    • 如果分布偏向高指数,则可能会采样不足。

如果数据可以表示为 aRow你可以尝试另一个技巧:

rdd.toDF().limit(n).rdd
Run Code Online (Sandbox Code Playgroud)