如何在Spark中对RDD进行排序和限制?

eti*_*tig 8 scala apache-spark rdd

我有Foo类的RDD : class Foo( name : String, createDate : Date ). 我想要一个年龄大10%的其他RDD Foo.我的第一个想法是排序createDate和限制0.1*计数,但没有限制功能.

你有个主意吗?

zer*_*323 15

假设Foo是这样的案例类:

import java.sql.Date
case class Foo(name: String, createDate: java.sql.Date)
Run Code Online (Sandbox Code Playgroud)
  1. 使用普通RDD:

    import org.apache.spark.rdd.RDD
    import scala.math.Ordering
    
    val rdd: RDD[Foo] = sc
      .parallelize(Seq(
        ("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"),
        ("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23")))
      .toDF("name", "createDate")
      .withColumn("createDate", $"createDate".cast("date"))
      .as[Foo].rdd
    
    rdd.cache()
    val  n = scala.math.ceil(0.1 * rdd.count).toInt
    
    Run Code Online (Sandbox Code Playgroud)
  2. 使用DataFrame(注意 - 由于限制行为,这实际上不是最佳性能).

    import org.apache.spark.sql.Row
    
    val topN = rdd.toDF.orderBy($"createDate").limit(n)
    topN.show
    
    // +----+----------+
    // |name|createDate|
    // +----+----------+
    // |   a|2009-11-23|
    // +----+----------+
    
    
    // Optionally recreate RDD[Foo]
    topN.map{case Row(name: String, date: Date) => Foo(name, date)} 
    
    Run Code Online (Sandbox Code Playgroud)

  • 嗨 zero323 你能很快说出为什么 DataFrame 的性能在限制操作上是次优的吗?在实现方面与RDD上的top相比有什么区别?@zero333 (2认同)