Spark排序RDD并加入他们的排名

bxs*_*shi 2 scala apache-spark rdd

我有一个RDD[(VertexId, Double)],我希望对它进行排序,_._2并使用此RDD加入索引(排名).因此,我可以获得一个元素及其排名filter.

目前我对RDD进行排序sortBy,但我不知道如何加入RDD及其排名.所以我把它作为一个序列收集并用它的索引压缩它.但这并不高效.我想知道是否有更优雅的方式来做到这一点.

我现在使用的代码是:

val tmpRes = graph.vertices.sortBy(_._2, ascending = false) // Sort all nodes by its PR score in descending order
      .collect() // collect to master, this may be very expensive

    tmpRes.zip(tmpRes.indices) // zip with index
Run Code Online (Sandbox Code Playgroud)

Oli*_*dot 6

如果任何机会,你想带回驱动程序仅ñ第一元组,那么也许你可以使用takeOrdered(N,[排序]) ,其中ñ是带来结果的回数和订购比较你"我喜欢用.

否则,您可以使用zipWithIndex转换,将您RDD[(VertexId, Double)]转换为RDD[((VertexId, Double), Long)]具有正确索引的转换(当然,您应该在排序后执行此操作).

例如 :

scala> val data = sc.parallelize(List(("A", 1), ("B", 2)))
scala> val sorted = data.sortBy(_._2)
scala> sorted.zipWithIndex.collect()
res1: Array[((String, Int), Long)] = Array(((A,1),0), ((B,2),1))
Run Code Online (Sandbox Code Playgroud)

问候,