如何按特定列对 Spark DataFrame 中的结构类型数组进行排序?

add*_*ing 6 scala apache-spark apache-spark-sql

给出以下代码:

import java.sql.Date
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SortQuestion extends App{

  val spark = SparkSession.builder().appName("local").master("local[*]").getOrCreate()
  import spark.implicits._
  case class ABC(a: Int, b: Int, c: Int)

  val first = Seq(
    ABC(1, 2, 3),
    ABC(1, 3, 4),
    ABC(2, 4, 5),
    ABC(2, 5, 6)
  ).toDF("a", "b", "c")

  val second = Seq(
    (1, 2, (Date.valueOf("2018-01-02"), 30)),
    (1, 3, (Date.valueOf("2018-01-01"), 20)),
    (2, 4, (Date.valueOf("2018-01-02"), 50)),
    (2, 5, (Date.valueOf("2018-01-01"), 60))
  ).toDF("a", "b", "c")

  first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b")).groupBy("a").agg(sort_array(collect_list("c2")))
    .show(false)

}
Run Code Online (Sandbox Code Playgroud)

Spark产生以下结果:

import java.sql.Date
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SortQuestion extends App{

  val spark = SparkSession.builder().appName("local").master("local[*]").getOrCreate()
  import spark.implicits._
  case class ABC(a: Int, b: Int, c: Int)

  val first = Seq(
    ABC(1, 2, 3),
    ABC(1, 3, 4),
    ABC(2, 4, 5),
    ABC(2, 5, 6)
  ).toDF("a", "b", "c")

  val second = Seq(
    (1, 2, (Date.valueOf("2018-01-02"), 30)),
    (1, 3, (Date.valueOf("2018-01-01"), 20)),
    (2, 4, (Date.valueOf("2018-01-02"), 50)),
    (2, 5, (Date.valueOf("2018-01-01"), 60))
  ).toDF("a", "b", "c")

  first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b")).groupBy("a").agg(sort_array(collect_list("c2")))
    .show(false)

}
Run Code Online (Sandbox Code Playgroud)

这意味着 Spark 按日期对数组进行排序(因为它是第一列),但我想指示 Spark 按该数组中的特定列排序。我知道我可以将数组重塑为,(value, date)但似乎不方便,我想要一个通用的解决方案(假设我有一个大嵌套结构,5 层深,我想按特定列对该结构进行排序)。有没有办法做到这一点?我错过了什么吗?

phi*_*ert 6

根据蜂巢维基

sort_array(Array<T>) :根据数组元素的自然顺序对输入数组进行升序排序并返回(从0.9.0版本开始)。

这意味着数组将按字典顺序排序,即使对于复杂的数据类型也是如此。

或者,您可以创建一个 UDF 以根据第二个元素对其进行排序(并见证性能下降):

val sortUdf = udf { (xs: Seq[Row]) => xs.sortBy(_.getAs[Int](1) )
                                        .map{ case Row(x:java.sql.Date, y: Int) => (x,y) }}

first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b"))
     .groupBy("a")
     .agg(sortUdf(collect_list("c2")))
     .show(false)

//+---+----------------------------------+
//|a  |UDF(collect_list(c2, 0, 0))       |
//+---+----------------------------------+
//|1  |[[2018-01-01,20], [2018-01-02,30]]|
//|2  |[[2018-01-02,50], [2018-01-01,60]]|
//+---+----------------------------------+
Run Code Online (Sandbox Code Playgroud)

  • 关于数组类型的问题很清楚。sort_array 不适用于结构。您将收到错误:“sort_array 不支持对 struct 类型的数组进行排序”因此您对 sort_array 的评论无效。 (2认同)

bla*_*hop 6

对于 Spark 3+,您可以将自定义比较器函数传递给array_sort

比较器将采用两个参数来表示数组的两个元素。当第一个元素小于、等于或大于第二个元素时,它返回 -1、0 或 1。如果比较器函数返回其他值(包括 null),该函数将失败并引发错误。

val df = first
  .join(second.withColumnRenamed("c", "c2"), Seq("a", "b"))
  .groupBy("a")
  .agg(collect_list("c2").alias("list"))

val df2 = df.withColumn(
  "list",
  expr(
    "array_sort(list, (left, right) -> case when left._2 < right._2 then -1 when left._2 > right._2 then 1 else 0 end)"
  )
)

df2.show(false)
//+---+------------------------------------+
//|a  |list                                |
//+---+------------------------------------+
//|1  |[[2018-01-01, 20], [2018-01-02, 30]]|
//|2  |[[2018-01-02, 50], [2018-01-01, 60]]|
//+---+------------------------------------+
Run Code Online (Sandbox Code Playgroud)

_2您想要用于排序的结构体字段的名称在哪里


hi-*_*zir 3

如果您有复杂的对象,那么使用静态类型会更好Dataset

case class Result(a: Int, b: Int, c: Int, c2: (java.sql.Date, Int))

val joined = first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b"))
joined.as[Result]
  .groupByKey(_.a)
  .mapGroups((key, xs) => (key, xs.map(_.c2).toSeq.sortBy(_._2)))
  .show(false)

// +---+----------------------------------+            
// |_1 |_2                                |
// +---+----------------------------------+
// |1  |[[2018-01-01,20], [2018-01-02,30]]|
// |2  |[[2018-01-02,50], [2018-01-01,60]]|
// +---+----------------------------------+
Run Code Online (Sandbox Code Playgroud)

在简单的情况下,也可以这样做udf,但通常会导致代码效率低下且脆弱,并且当对象的复杂性增加时,代码很快就会失控。