Mpi*_*ris 1 apache-spark pyspark
让我们说我有以下rdd:
a = [('a',1),('a',2),('a',3),('b',1),('b',4),('c',3)]
anRDD = sc.parallelize(a)
Run Code Online (Sandbox Code Playgroud)
而且我想让他们键出现的行超过N次(这个例子可以说大于或等于2).在另一个rdd排除的那些.
我所做的是以下内容:
threshold = 2
anRDD.persist()
grouped_counts = anRDD
.toDF(['letter','number'])
.groupBy('letter')
.count()
downFromLimit = grouped_counts.filter(grouped_counts['count']<threshold).select("letter").map(lambda x:x.letter).collect()
upTheLimit = grouped_counts.filter(grouped_counts['count']>=threshold).select("letter").map(lambda x:x.letter).collect()
upData = anRDD.filter(lambda x:x[0] in upTheLimit)
downData = anRDD.filter(lambda x:x[0] in downFromLimit)
anRDD.unpersist()
Run Code Online (Sandbox Code Playgroud)
它做我想要的,但它应该比这更清晰,更容易和更有效.
如果我使用reduceByKey
并计算值的长度会更有益吗?
还有其他想法吗?
你的方法和Alberto都把所有的钥匙都拉回到司机身上,如果你有很多钥匙就会出问题.
而不是这样做,我将创建聚合DF,然后将其与原始数据相连.然后,您可以在写入时使用分区来一次性保存两个组.
您应尽可能将所有数据保存在DataFrame中,而不是RDD.使用DataFrame时有大量优化,尤其适用于使用pyspark时.
from pyspark.sql import functions as F
df = anRDD.toDF(['letter','number'])
counts = df.groupBy('letter') \
.count()
# Join to the original data
df = df.join(counts, df.letter == counts.letter)
# Add the column to partition on
updated = df.withColumn('group', F.when(df.count < 2, 'down').otherwise('up')) \
.select('letter', 'group')
# Partitioning lets us write out both groups at once, instead of recomputing stages
updated.write.partitionBy('group').text('/some/output/path')
Run Code Online (Sandbox Code Playgroud)
这将为您创建一个文件夹结构,如下所示:
/some/output/path
group=down
part-0000
...
group=up
part-0000
...
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1826 次 |
最近记录: |