获取 Spark 中 RDD 中每个键的前 3 个值

Sup*_*ans 3 lambda python-2.7 apache-spark rdd pyspark

我是 Spark 的初学者,我正在尝试创建一个 RDD,其中包含每个键的前 3 个值(不仅仅是前 3 个值)。我当前的 RDD 包含以下格式的数千个条目:

(key, String, value)
Run Code Online (Sandbox Code Playgroud)

所以想象一下我有一个内容如下的 RDD:

[("K1", "aaa", 6), ("K1", "bbb", 3), ("K1", "ccc", 2), ("K1", "ddd", 9),
("B1", "qwe", 4), ("B1", "rty", 7), ("B1", "iop", 8), ("B1", "zxc", 1)]
Run Code Online (Sandbox Code Playgroud)

我目前可以像这样显示 RDD 中的前 3 个值:

("K1", "ddd", 9)
("B1", "iop", 8)
("B1", "rty", 7)
Run Code Online (Sandbox Code Playgroud)

使用:

top3RDD = rdd.takeOrdered(3, key = lambda x: x[2])
Run Code Online (Sandbox Code Playgroud)

相反,我想要的是为 RDD 中的每个键收集前 3 个值,所以我想返回这个:

("K1", "ddd", 9)
("K1", "aaa", 6)
("K1", "bbb", 3)
("B1", "iop", 8)
("B1", "rty", 7)
("B1", "qwe", 4)
Run Code Online (Sandbox Code Playgroud)

Psi*_*dom 5

您需要 groupBy key,然后您可以使用heapq.nlargest从每个组中获取前 3 个值:

from heapq import nlargest
rdd.groupBy(
    lambda x: x[0]
).flatMap(
    lambda g: nlargest(3, g[1], key=lambda x: x[2])
).collect()

[('B1', 'iop', 8), 
 ('B1', 'rty', 7), 
 ('B1', 'qwe', 4), 
 ('K1', 'ddd', 9), 
 ('K1', 'aaa', 6), 
 ('K1', 'bbb', 3)]
Run Code Online (Sandbox Code Playgroud)