mgo*_*ser 13 hadoop apache-spark
我在hdfs中有一个文件,它分布在集群中的节点上.
我正试图从这个文件中随机抽取10行.
在pyspark shell中,我使用以下命令将文件读入RDD:
>>> textFile = sc.textFile("/user/data/myfiles/*")
Run Code Online (Sandbox Code Playgroud)
然后我想简单地拿一个样本......关于Spark的一个很酷的事情是有类似的命令takeSample
,不幸的是我认为我做错了,因为以下需要很长时间:
>>> textFile.takeSample(False, 10, 12345)
Run Code Online (Sandbox Code Playgroud)
所以我尝试在每个节点上创建一个分区,然后使用以下命令指示每个节点对该分区进行采样:
>>> textFile.partitionBy(4).mapPartitions(lambda blockOfLines: blockOfLines.takeSample(False, 10, 1234)).first()
Run Code Online (Sandbox Code Playgroud)
但这会给出一个错误ValueError: too many values to unpack
:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/serializers.py", line 117, in dump_stream
for obj in iterator:
File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/rdd.py", line 821, in add_shuffle_key
for (k, v) in iterator:
ValueError: too many values to unpack
Run Code Online (Sandbox Code Playgroud)
如何使用spark或pyspark从大型分布式数据集中采样10行?
aar*_*man 27
请尝试使用textFile.sample(false,fraction,seed)
.takeSample
通常会非常慢,因为它调用count()
RDD.它需要这样做,因为否则它不会从每个分区均匀地获取,基本上它使用计数以及您要求的样本大小来计算分数和sample
内部调用.sample
很快,因为它只使用一个随机布尔生成器,返回真正fraction
的时间百分比,因此不需要调用count
.
另外,我不认为这发生在你身上,但是如果返回的样本量不够大,它会sample
再次调用,这显然会减慢速度.既然您应该对数据的大小有所了解,我建议您调用样本,然后自己将样本缩小到适当的大小,因为您对数据的了解比Spark更多.
mgo*_*ser 16
使用样本代替takeSample似乎可以使事情变得相当快:
textFile.sample(False, .0001, 12345)
Run Code Online (Sandbox Code Playgroud)
这个问题是除非你大致了解数据集中的行数,否则很难知道选择正确的分数.
归档时间: |
|
查看次数: |
23158 次 |
最近记录: |