Uri*_*ren 2 python hadoop apache-spark rdd pyspark
我想减少每个减速器的记录数量,并将结果变量保留为rdd
使用takeSample
似乎是显而易见的选择,但是,它返回一个collection
而不是一个SparkContext
对象。
我想出了这个方法:
rdd = rdd.zipWithIndex().filter(lambda x:x[1]<limit).map(lambda x:x[0])
Run Code Online (Sandbox Code Playgroud)
然而,这种方法非常慢并且效率不高。
有没有更聪明的方法来获取小样本并保持数据结构rdd
?
如果您想要一个小的示例子集并且不能对数据做出任何额外的假设,那么take
与之结合parallelize
可以是最佳解决方案:
sc.parallelize(rdd.take(n))
Run Code Online (Sandbox Code Playgroud)
它将涉及相对较少数量的分区(在最好的情况下只有一个),并且小n的网络流量成本应该可以忽略不计。
采样(randomSplit
或)将需要与sample
相同的完整数据扫描。zipWithIndex
filter
假设没有数据倾斜,您可以尝试这样的方法来解决这个问题:
from __future__ import division # Python 2 only
def limitApprox(rdd, n, timeout):
count = rdd.countApprox(timeout)
if count <= n:
return rdd
else:
rec_per_part = count // rdd.getNumPartitions()
required_parts = n / rec_per_part if rec_per_part else 1
return rdd.mapPartitionsWithIndex(
lambda i, iter: iter if i < required_parts else []
)
Run Code Online (Sandbox Code Playgroud)
如果数据可以表示为 aRow
你可以尝试另一个技巧:
rdd.toDF().limit(n).rdd
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
9883 次 |
最近记录: |