add*_*ing 6 scala apache-spark apache-spark-sql
给出以下代码:
import java.sql.Date
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SortQuestion extends App{
val spark = SparkSession.builder().appName("local").master("local[*]").getOrCreate()
import spark.implicits._
case class ABC(a: Int, b: Int, c: Int)
val first = Seq(
ABC(1, 2, 3),
ABC(1, 3, 4),
ABC(2, 4, 5),
ABC(2, 5, 6)
).toDF("a", "b", "c")
val second = Seq(
(1, 2, (Date.valueOf("2018-01-02"), 30)),
(1, 3, (Date.valueOf("2018-01-01"), 20)),
(2, 4, (Date.valueOf("2018-01-02"), 50)),
(2, 5, (Date.valueOf("2018-01-01"), 60))
).toDF("a", "b", "c")
first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b")).groupBy("a").agg(sort_array(collect_list("c2")))
.show(false)
}
Run Code Online (Sandbox Code Playgroud)
Spark产生以下结果:
import java.sql.Date
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SortQuestion extends App{
val spark = SparkSession.builder().appName("local").master("local[*]").getOrCreate()
import spark.implicits._
case class ABC(a: Int, b: Int, c: Int)
val first = Seq(
ABC(1, 2, 3),
ABC(1, 3, 4),
ABC(2, 4, 5),
ABC(2, 5, 6)
).toDF("a", "b", "c")
val second = Seq(
(1, 2, (Date.valueOf("2018-01-02"), 30)),
(1, 3, (Date.valueOf("2018-01-01"), 20)),
(2, 4, (Date.valueOf("2018-01-02"), 50)),
(2, 5, (Date.valueOf("2018-01-01"), 60))
).toDF("a", "b", "c")
first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b")).groupBy("a").agg(sort_array(collect_list("c2")))
.show(false)
}
Run Code Online (Sandbox Code Playgroud)
这意味着 Spark 按日期对数组进行排序(因为它是第一列),但我想指示 Spark 按该数组中的特定列排序。我知道我可以将数组重塑为,(value, date)
但似乎不方便,我想要一个通用的解决方案(假设我有一个大嵌套结构,5 层深,我想按特定列对该结构进行排序)。有没有办法做到这一点?我错过了什么吗?
根据蜂巢维基:
sort_array(Array<T>)
:根据数组元素的自然顺序对输入数组进行升序排序并返回(从0.9.0版本开始)。
这意味着数组将按字典顺序排序,即使对于复杂的数据类型也是如此。
或者,您可以创建一个 UDF 以根据第二个元素对其进行排序(并见证性能下降):
val sortUdf = udf { (xs: Seq[Row]) => xs.sortBy(_.getAs[Int](1) )
.map{ case Row(x:java.sql.Date, y: Int) => (x,y) }}
first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b"))
.groupBy("a")
.agg(sortUdf(collect_list("c2")))
.show(false)
//+---+----------------------------------+
//|a |UDF(collect_list(c2, 0, 0)) |
//+---+----------------------------------+
//|1 |[[2018-01-01,20], [2018-01-02,30]]|
//|2 |[[2018-01-02,50], [2018-01-01,60]]|
//+---+----------------------------------+
Run Code Online (Sandbox Code Playgroud)
对于 Spark 3+,您可以将自定义比较器函数传递给array_sort
:
比较器将采用两个参数来表示数组的两个元素。当第一个元素小于、等于或大于第二个元素时,它返回 -1、0 或 1。如果比较器函数返回其他值(包括 null),该函数将失败并引发错误。
val df = first
.join(second.withColumnRenamed("c", "c2"), Seq("a", "b"))
.groupBy("a")
.agg(collect_list("c2").alias("list"))
val df2 = df.withColumn(
"list",
expr(
"array_sort(list, (left, right) -> case when left._2 < right._2 then -1 when left._2 > right._2 then 1 else 0 end)"
)
)
df2.show(false)
//+---+------------------------------------+
//|a |list |
//+---+------------------------------------+
//|1 |[[2018-01-01, 20], [2018-01-02, 30]]|
//|2 |[[2018-01-02, 50], [2018-01-01, 60]]|
//+---+------------------------------------+
Run Code Online (Sandbox Code Playgroud)
_2
您想要用于排序的结构体字段的名称在哪里
如果您有复杂的对象,那么使用静态类型会更好Dataset
。
case class Result(a: Int, b: Int, c: Int, c2: (java.sql.Date, Int))
val joined = first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b"))
joined.as[Result]
.groupByKey(_.a)
.mapGroups((key, xs) => (key, xs.map(_.c2).toSeq.sortBy(_._2)))
.show(false)
// +---+----------------------------------+
// |_1 |_2 |
// +---+----------------------------------+
// |1 |[[2018-01-01,20], [2018-01-02,30]]|
// |2 |[[2018-01-02,50], [2018-01-01,60]]|
// +---+----------------------------------+
Run Code Online (Sandbox Code Playgroud)
在简单的情况下,也可以这样做udf
,但通常会导致代码效率低下且脆弱,并且当对象的复杂性增加时,代码很快就会失控。
归档时间: |
|
查看次数: |
17306 次 |
最近记录: |