如何在Spark窗口函数中使用降序的orderby()?

Mal*_*lte 19 scala apache-spark apache-spark-sql spark-dataframe

我需要一个窗口函数,它按一些键(=列名称)进行分区,按另一个列名称进行排序,并返回前x行的行.

这适用于升序:

def getTopX(df: DataFrame, top_x: String, top_key: String, top_value:String): DataFrame ={
    val top_keys: List[String] = top_key.split(", ").map(_.trim).toList
    val w = Window.partitionBy(top_keys(1),top_keys.drop(1):_*)
       .orderBy(top_value)
    val rankCondition = "rn < "+top_x.toString
    val dfTop = df.withColumn("rn",row_number().over(w))
      .where(rankCondition).drop("rn")
  return dfTop
}
Run Code Online (Sandbox Code Playgroud)

但是当我尝试将其更改为第4行orderBy(desc(top_value))orderBy(top_value.desc)第4行时,我收到语法错误.这里的语法是什么?

Sim*_*Sim 38

有两个版本orderBy,一个适用于字符串,另一个适用于Column对象(API).您的代码使用的是第一个版本,该版本不允许更改排序顺序.您需要切换到列版本然后调用desc方法,例如,myCol.desc.

现在,我们进入API设计领域.传递Column参数的优点是你有更多的灵活性,例如,你可以使用表达式等.如果你想维护一个接受字符串而不是a的API Column,你需要将字符串转换为列.有很多方法可以做到这一点,最容易使用org.apache.spark.sql.functions.col(myColName).

总而言之,我们得到了

.orderBy(org.apache.spark.sql.functions.col(top_value).desc)
Run Code Online (Sandbox Code Playgroud)

  • 当我在 desc 后面没有使用括号时出现错误,不知道为什么。添加后,它就会按照我想要的方式工作,留下 `.orderBy(f.col('col_name_1'),f.col('date'), f.col('col_name_2').desc())` (4认同)
  • @Anne 我知道这是几年后的事了,但我很高兴你发表评论,因为我也在使用 PySpark 并且需要一种方法来让它工作。您的解决方案对我有用,所以谢谢! (2认同)

Sar*_*avu 8

例如,如果我们需要Date在 Window 函数中按降序调用的列进行排序,请$在列名之前使用符号,这将使我们能够使用ascordesc语法。

Window.orderBy($"Date".desc)
Run Code Online (Sandbox Code Playgroud)

在双引号中指定列名后,给出.desc将按降序排序。