Spark:删除少于N次的行

Mpi*_*ris 1 apache-spark pyspark

让我们说我有以下rdd:

a = [('a',1),('a',2),('a',3),('b',1),('b',4),('c',3)]
anRDD = sc.parallelize(a)
Run Code Online (Sandbox Code Playgroud)

而且我想让他们键出现的行超过N次(这个例子可以说大于或等于2).在另一个rdd排除的那些.

我所做的是以下内容:

threshold = 2
anRDD.persist()
grouped_counts = anRDD
                    .toDF(['letter','number'])
                    .groupBy('letter')
                    .count()

downFromLimit = grouped_counts.filter(grouped_counts['count']<threshold).select("letter").map(lambda x:x.letter).collect()  
upTheLimit = grouped_counts.filter(grouped_counts['count']>=threshold).select("letter").map(lambda x:x.letter).collect()

upData = anRDD.filter(lambda x:x[0] in upTheLimit)
downData = anRDD.filter(lambda x:x[0] in downFromLimit)
anRDD.unpersist()
Run Code Online (Sandbox Code Playgroud)

它做我想要的,但它应该比这更清晰,更容易和更有效.

如果我使用reduceByKey并计算值的长度会更有益吗?

还有其他想法吗?

Rya*_*ier 5

你的方法和Alberto都把所有的钥匙都拉回到司机身上,如果你有很多钥匙就会出问题.

而不是这样做,我将创建聚合DF,然后将其与原始数据相连.然后,您可以在写入时使用分区来一次性保存两个组.

您应尽可能将所有数据保存在DataFrame中,而不是RDD.使用DataFrame时有大量优化,尤其适用于使用pyspark时.

from pyspark.sql import functions as F    

df = anRDD.toDF(['letter','number'])

counts = df.groupBy('letter') \
           .count()

# Join to the original data
df = df.join(counts, df.letter == counts.letter)

# Add the column to partition on
updated = df.withColumn('group', F.when(df.count < 2, 'down').otherwise('up')) \
           .select('letter', 'group')

# Partitioning lets us write out both groups at once, instead of recomputing stages
updated.write.partitionBy('group').text('/some/output/path')
Run Code Online (Sandbox Code Playgroud)

这将为您创建一个文件夹结构,如下所示:

/some/output/path
     group=down
          part-0000
          ...
     group=up
          part-0000
          ...
Run Code Online (Sandbox Code Playgroud)