Spark Dataframe groupBy并将结果排序到列表中

use*_*965 11 dataframe apache-spark apache-spark-sql

我有一个Spark Dataframe,我想用一个键对元素进行分组,并将结果作为一个排序列表

目前我正在使用:

df.groupBy("columnA").agg(collect_list("columnB"))

如何使列表中的项目按升序排序?

Dan*_*ula 22

您可以尝试函数包中sort_array提供的函数:

import org.apache.spark.sql.functions._
df.groupBy("columnA").agg(sort_array(collect_list("columnB")))
Run Code Online (Sandbox Code Playgroud)

  • 如何根据同一个`df`中的不同列对`collect_list()`中的元素进行排序? (19认同)
  • @vdep参见[另一个答案](/sf/answers/3809881141/) (2认同)

Goo*_*Dok 7

只是想为Daniel de Paula关于sort_array解决方案的答案添加另一个提示。

如果要根据不同的列对元素进行排序,则可以形成两个字段的结构:

  • 排序
  • 结果现场

由于结构是按字段进行排序的,因此您将获得所需的顺序,因此您所需要的就是摆脱结果列表中每个元素的排序
必要时,可以将相同的方法应用于列进行的几种排序

这是一个可以在本地(使用模式)下运行的示例spark-shell:paste

import org.apache.spark.sql.Row
import spark.implicits._

case class Employee(name: String, department: String, salary: Double)

val employees = Seq(
  Employee("JSMITH", "A", 20.0),
  Employee("AJOHNSON", "A", 650.0),
  Employee("CBAKER", "A", 650.2),
  Employee("TGREEN", "A", 13.0),
  Employee("CHORTON", "B", 111.0),
  Employee("AIVANOV", "B", 233.0),
  Employee("VSMIRNOV", "B", 11.0)
)

val employeesDF = spark.createDataFrame(employees)

val getNames = udf { salaryNames: Seq[Row] =>
  salaryNames.map { case Row(_: Double, name: String) => name }
}

employeesDF
  .groupBy($"department")
  .agg(collect_list(struct($"salary", $"name")).as("salaryNames"))
  .withColumn("namesSortedBySalary", getNames(sort_array($"salaryNames", asc = false)))
  .show(truncate = false)
Run Code Online (Sandbox Code Playgroud)

结果:

+----------+--------------------------------------------------------------------+----------------------------------+
|department|salaryNames                                                         |namesSortedBySalary               |
+----------+--------------------------------------------------------------------+----------------------------------+
|B         |[[111.0, CHORTON], [233.0, AIVANOV], [11.0, VSMIRNOV]]              |[AIVANOV, CHORTON, VSMIRNOV]      |
|A         |[[20.0, JSMITH], [650.0, AJOHNSON], [650.2, CBAKER], [13.0, TGREEN]]|[CBAKER, AJOHNSON, JSMITH, TGREEN]|
+----------+--------------------------------------------------------------------+----------------------------------+
Run Code Online (Sandbox Code Playgroud)