如何在Scala Spark中对RDD进行排序?

blu*_*sky 34 scala apache-spark rdd

读取Spark方法sortByKey:

sortByKey([ascending], [numTasks])   When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
Run Code Online (Sandbox Code Playgroud)

是否可以返回"N"个数量的结果.因此,不要返回所有结果,只返回前10位.我可以将已排序的集合转换为数组并使用take方法,但由于这是一个O(N)操作,是否有更有效的方法?

Dan*_*bos 51

如果您只需要前10名,请使用rdd.top(10).它避免了排序,所以它更快.

rdd.top使一个并行传递数据,收集堆中每个分区的前N个,然后合并堆.这一个O(rdd.count)操作.排序将是O(rdd.count log rdd.count),并且会产生大量数据传输 - 它会进行随机播放,因此所有数据都将通过网络传输.

  • 我不知道这个方法.这是一个比sort()更好的解决方案.所以这是一个比我更好的答案(虽然它可能提供一些有用的背景).我是在倾诉. (2认同)
  • 不,它不会那样工作.我建议用一个单独的问题来找到关键的前10名,因为这是一个更大的话题. (2认同)

jav*_*dba 19

您很可能已经仔细阅读了源代码:

  class OrderedRDDFunctions {
   // <snip>
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    val part = new RangePartitioner(numPartitions, self, ascending)
    val shuffled = new ShuffledRDD[K, V, P](self, part)
    shuffled.mapPartitions(iter => {
      val buf = iter.toArray
      if (ascending) {
        buf.sortWith((x, y) => x._1 < y._1).iterator
      } else {
        buf.sortWith((x, y) => x._1 > y._1).iterator
      }
    }, preservesPartitioning = true)
  }
Run Code Online (Sandbox Code Playgroud)

而且,正如你所说,整个数据必须经过洗牌阶段 - 如代码片段所示.

但是,您对随后调用take(K)的担忧可能不那么准确.此操作不会遍历所有N个项目:

  /**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   */
  def take(num: Int): Array[T] = {
Run Code Online (Sandbox Code Playgroud)

那么,它似乎:

O(myRdd.take(K))<< O(myRdd.sortByKey())〜= O(myRdd.sortByKey.take(k))(至少对于小K)<< O(myRdd.sortByKey().collect ()


jru*_*ren 8

另一个选择,至少来自PySpark 1.2.0,是使用takeOrdered.

按升序排列:

rdd.takeOrdered(10)
Run Code Online (Sandbox Code Playgroud)

按降序排列:

rdd.takeOrdered(10, lambda x: -x)
Run Code Online (Sandbox Code Playgroud)

k,v对的前k个值:

rdd.takeOrdered(10, lambda (k, v): -v)
Run Code Online (Sandbox Code Playgroud)