thi*_*bee 4 scala apache-spark apache-spark-sql
我想sum使用SparkSQL在数组列上(或执行其他聚合函数).
我有一张桌子
+-------+-------+---------------------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+---------------------------------+
| 10|Finance| [100, 200, 300, 400, 500]|
| 20| IT| [10, 20, 50, 100]|
+-------+-------+---------------------------------+
Run Code Online (Sandbox Code Playgroud)
我想总结一下这个emp_details专栏的价值.
预期查询:
sqlContext.sql("select sum(emp_details) from mytable").show
Run Code Online (Sandbox Code Playgroud)
预期结果
1500
180
Run Code Online (Sandbox Code Playgroud)
此外,我应该能够总结范围元素:
sqlContext.sql("select sum(slice(emp_details,0,3)) from mytable").show
Run Code Online (Sandbox Code Playgroud)
结果
600
80
Run Code Online (Sandbox Code Playgroud)
当对数组类型进行求和时,它表示sum预期参数为数字类型而不是数组类型.
我认为我们需要为此创建UDF.但怎么样?
我是否会面临使用UDF的任何性能命中?除了UDF之外还有其他解决方案吗?
Jac*_*ski 13
从Spark 2.4开始,Spark SQL支持用于处理复杂数据结构(包括数组)的高阶函数.
"现代"解决方案如下:
scala> input.show(false)
+-------+-------+-------------------------+
|dept_id|dept_nm|emp_details |
+-------+-------+-------------------------+
|10 |Finance|[100, 200, 300, 400, 500]|
|20 |IT |[10, 20, 50, 100] |
+-------+-------+-------------------------+
input.createOrReplaceTempView("mytable")
val sqlText = "select dept_id, dept_nm, aggregate(emp_details, 0, (acc, value) -> acc + value) as sum from mytable"
scala> sql(sqlText).show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
| 10|Finance|1500|
| 20| IT| 180|
+-------+-------+----+
Run Code Online (Sandbox Code Playgroud)
您可以在以下文章和视频中找到关于高阶函数的好读物:
免责声明由于Spark SQL执行的反序列化,我不推荐这种方法(尽管它得到了最多的赞成)Dataset.map.该查询强制Spark反序列化数据并将其加载到JVM(从JVM外部的Spark管理的内存区域).这将不可避免地导致更频繁的GC,从而使性能变差.
一种解决方案是使用Dataset解决方案,其中Spark SQL和Scala的组合可以显示其功能.
scala> val inventory = Seq(
| (10, "Finance", Seq(100, 200, 300, 400, 500)),
| (20, "IT", Seq(10, 20, 50, 100))).toDF("dept_id", "dept_nm", "emp_details")
inventory: org.apache.spark.sql.DataFrame = [dept_id: int, dept_nm: string ... 1 more field]
// I'm too lazy today for a case class
scala> inventory.as[(Long, String, Seq[Int])].
map { case (deptId, deptName, details) => (deptId, deptName, details.sum) }.
toDF("dept_id", "dept_nm", "sum").
show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
| 10|Finance|1500|
| 20| IT| 180|
+-------+-------+----+
Run Code Online (Sandbox Code Playgroud)
我将切片部分作为练习,因为它同样简单.
explode()它可以在您的列上使用一种可能的方法Array,从而通过唯一键聚合输出。例如:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
(mytable
.withColumn("emp_sum",
explode($"emp_details"))
.groupBy("dept_nm")
.agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance| 1500|
| IT| 180|
+-------+------------+
Run Code Online (Sandbox Code Playgroud)
要仅选择数组中的特定值,我们可以使用链接问题的答案并稍作修改即可应用:
val slice = udf((array : Seq[Int], from : Int, to : Int) => array.slice(from,to))
(mytable
.withColumn("slice",
slice($"emp_details",
lit(0),
lit(3)))
.withColumn("emp_sum",
explode($"slice"))
.groupBy("dept_nm")
.agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance| 600|
| IT| 80|
+-------+------------+
Run Code Online (Sandbox Code Playgroud)
数据:
val data = Seq((10, "Finance", Array(100,200,300,400,500)),
(20, "IT", Array(10,20,50,100)))
val mytable = sc.parallelize(data).toDF("dept_id", "dept_nm","emp_details")
Run Code Online (Sandbox Code Playgroud)
从Spark 2.4开始,您可以使用以下slice功能进行切片:
import org.apache.spark.sql.functions.slice
val df = Seq(
(10, "Finance", Seq(100, 200, 300, 400, 500)),
(20, "IT", Seq(10, 20, 50, 100))
).toDF("dept_id", "dept_nm", "emp_details")
val dfSliced = df.withColumn(
"emp_details_sliced",
slice($"emp_details", 1, 3)
)
dfSliced.show(false)
Run Code Online (Sandbox Code Playgroud)
import org.apache.spark.sql.functions.slice
val df = Seq(
(10, "Finance", Seq(100, 200, 300, 400, 500)),
(20, "IT", Seq(10, 20, 50, 100))
).toDF("dept_id", "dept_nm", "emp_details")
val dfSliced = df.withColumn(
"emp_details_sliced",
slice($"emp_details", 1, 3)
)
dfSliced.show(false)
Run Code Online (Sandbox Code Playgroud)
和数组与aggregate:
dfSliced.selectExpr(
"*",
"aggregate(emp_details, 0, (x, y) -> x + y) as details_sum",
"aggregate(emp_details_sliced, 0, (x, y) -> x + y) as details_sliced_sum"
).show
Run Code Online (Sandbox Code Playgroud)
+-------+-------+-------------------------+------------------+
|dept_id|dept_nm|emp_details |emp_details_sliced|
+-------+-------+-------------------------+------------------+
|10 |Finance|[100, 200, 300, 400, 500]|[100, 200, 300] |
|20 |IT |[10, 20, 50, 100] |[10, 20, 50] |
+-------+-------+-------------------------+------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6013 次 |
| 最近记录: |