Spark按照值排序集合

use*_*916 35 sorting word-count apache-spark

我正在尝试本教程http://spark.apache.org/docs/latest/quick-start.html 我首先从文件创建了一个集合

textFile = sc.textFile("README.md")
Run Code Online (Sandbox Code Playgroud)

然后我尝试了一个命令来解决这些问题:

wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
Run Code Online (Sandbox Code Playgroud)

要打印集合:

 wordCounts.collect()
Run Code Online (Sandbox Code Playgroud)

我找到了如何使用命令sortByKey逐字排序.我想知道如何通过值进行排序可以做同样的事情,在这种情况下,文档中出现一个单词的数字.

小智 32

排序通常应该在调用collect()之前完成,因为它会将数据集返回给驱动程序,这也就是在java中编写hadoop map-reduce作业的方式,以便编写所需的最终输出(通常)到HDFS.使用spark API,这种方法可以灵活地以"原始"形式将输出写入您想要的位置,例如可以将其用作进一步处理的输入的文件.

使用spark的scala API在collect()之前进行排序可以按照eliasah的建议完成并使用Tuple2.swap()两次,一次在排序之前和之后一次,以便生成按第二个字段的递增或递减顺序排序的元组列表(其中名为_2)并包含第一个字段(名为_1)中的字数.下面是一个如何在spark-shell中编写脚本的示例:

// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _, 1)  // 2nd arg configures one task (same as number of partitions)
  .map(item => item.swap) // interchanges position of entries in each tuple
  .sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
  .map(item => item.swap)
Run Code Online (Sandbox Code Playgroud)

为了颠倒排序的顺序,使用sortByKey(false,1),因为它的第一个arg是升序的布尔值.它的第二个参数是任务数(等于分区数),它被设置为1,用于测试一个只需要一个输出数据文件的小输入文件; reduceByKey也接受这个可选参数.

在此之后,wordCounts RDD可以作为文本文件保存到具有saveAsTextFile(directory_pathname)的目录中,其中将存放一个或多个part-xxxxx文件(从part-00000开始),具体取决于为作业配置的reducer的数量(1每个reducer输出数据文件),_SUCCESS文件取决于作业是否成功以及.crc文件.

使用pyspark一个非常类似于上面显示的scala脚本的python脚本产生的输出实际上是相同的.以下是pyspark版本,演示按值排序集合:

file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
    .map(lambda (a, b): (b, a)) \
    .sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
    .map(lambda (a, b): (b, a))
Run Code Online (Sandbox Code Playgroud)

为了按降序排序bybyKey,它的第一个arg应为0.由于python捕获前导和尾随空格作为数据,因此在分割空格上的每一行之前插入strip(),但这不是必须使用spark-shell/scala.

wordCount的spark和python版本输出的主要区别在于spark输出(word,3)python输出(u'word',3).

有关spark RDD方法的更多信息,请参阅http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html for python和https://spark.apache.org/ scala的docs/latest/api/scala /#org.apache.spark.rdd.RDD.

在spark-shell中,在wordCounts上运行collect()将它从RDD转换为Array [(String,Int)] = Array [Tuple2(String,Int)],它本身可以在每个Tuple2元素的第二个字段上排序使用:

Array.sortBy(_._2) 
Run Code Online (Sandbox Code Playgroud)

sortBy还采用了一个可选的隐式math.Ordering参数,如Romeo Kienzler在前一个问题的答案中显示.Array.sortBy(_._ 2)将在其_2字段上对Array Tuple2元素进行反向排序,只需在运行map-reduce脚本之前定义隐式反向排序,因为它会覆盖Int的预先存在的排序.反向int由Romeo Kienzler定义的订购是:

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}
Run Code Online (Sandbox Code Playgroud)

定义此反向排序的另一种常用方法是反转a和b的顺序并删除比较定义右侧的(-1):

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = b.compare(a)
}   
Run Code Online (Sandbox Code Playgroud)


kir*_*an6 20

以更加pythonic的方式做它.

# In descending order
''' The first parameter tells number of elements
    to be present in output.
''' 
data.takeOrdered(10, key=lambda x: -x[1])
# In Ascending order
data.takeOrdered(10, key=lambda x: x[1])
Run Code Online (Sandbox Code Playgroud)


oke*_*llo 6

对于那些希望按值排序的前N个元素的人:

theRDD.takeOrdered(N, lambda (key, value): -1 * len(value))
Run Code Online (Sandbox Code Playgroud)

如果您希望按字符串长度排序.

另一方面,如果值已经采用适合您所需订购的形式,那么:

theRDD.takeOrdered(N, lambda (key, value): -1 * value)
Run Code Online (Sandbox Code Playgroud)

就够了


Rom*_*ler 5

你可以这样

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
    override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}

counts.collect.toSeq.sortBy(_._2)
Run Code Online (Sandbox Code Playgroud)

因此,基本上,您可以将RDD转换为序列,并使用sort方法对其进行排序。

上面的代码块全局更改了排序行为,以获取降序排序。


sta*_*wer 5

我认为您可以使用此处sortBy记录的通用转换(不是操作,即它返回 RDD 而不是数组)。

所以在你的情况下,你可以做

wordCounts.sortBy(lambda (word, count): count)
Run Code Online (Sandbox Code Playgroud)


use*_*916 1

我设法使用Python解决了这个问题。因此,我创建了一个值对列表并按值对其进行排序:

out = wordCounts.collect()
outSort = sorted(out, key=lambda word:word[1])
Run Code Online (Sandbox Code Playgroud)

  • 您将所有结果收集回驱动程序并在那里进行排序。它会起作用,但前提是您的结果集相对较小。如需大规模运行的解决方案,请参阅 eliasah 的解决方案。 (9认同)
  • 这并不能解决大数据的问题。如果数据很小,为什么还需要 Spark。 (2认同)