如何对数组列的元素进行切片和求和?

thi*_*bee 4 scala apache-spark apache-spark-sql

我想sum使用SparkSQL在数组列上(或执行其他聚合函数).

我有一张桌子

+-------+-------+---------------------------------+
|dept_id|dept_nm|                      emp_details|
+-------+-------+---------------------------------+
|     10|Finance|        [100, 200, 300, 400, 500]|
|     20|     IT|                [10, 20, 50, 100]|
+-------+-------+---------------------------------+
Run Code Online (Sandbox Code Playgroud)

我想总结一下这个emp_details专栏的价值.

预期查询:

sqlContext.sql("select sum(emp_details) from mytable").show
Run Code Online (Sandbox Code Playgroud)

预期结果

1500
180
Run Code Online (Sandbox Code Playgroud)

此外,我应该能够总结范围元素:

sqlContext.sql("select sum(slice(emp_details,0,3)) from mytable").show
Run Code Online (Sandbox Code Playgroud)

结果

600
80
Run Code Online (Sandbox Code Playgroud)

当对数组类型进行求和时,它表示sum预期参数为数字类型而不是数组类型.

我认为我们需要为此创建UDF.但怎么样?

我是否会面临使用UDF的任何性能命中?除了UDF之外还有其他解决方案吗?

Jac*_*ski 13

Spark 2.4.0

从Spark 2.4开始,Spark SQL支持用于处理复杂数据结构(包括数组)的高阶函数.

"现代"解决方案如下:

scala> input.show(false)
+-------+-------+-------------------------+
|dept_id|dept_nm|emp_details              |
+-------+-------+-------------------------+
|10     |Finance|[100, 200, 300, 400, 500]|
|20     |IT     |[10, 20, 50, 100]        |
+-------+-------+-------------------------+

input.createOrReplaceTempView("mytable")

val sqlText = "select dept_id, dept_nm, aggregate(emp_details, 0, (acc, value) -> acc + value) as sum from mytable"
scala> sql(sqlText).show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+
Run Code Online (Sandbox Code Playgroud)

您可以在以下文章和视频中找到关于高阶函数的好读物:

  1. 在Apache Spark 2.4中为复杂数据类型引入新的内置和高阶函数
  2. 在Databricks上使用SQL中的高阶函数处理嵌套数据
  3. Herman van Hovell(Databricks)介绍Spark SQL中的高阶函数

Spark 2.3.2及更早版本

免责声明由于Spark SQL执行的反序列化,我不推荐这种方法(尽管它得到了最多的赞成)Dataset.map.该查询强制Spark反序列化数据并将其加载到JVM(从JVM外部的Spark管理的内存区域).这将不可避免地导致更频繁的GC,从而使性能变差.

一种解决方案是使用Dataset解决方案,其中Spark SQL和Scala的组合可以显示其功能.

scala> val inventory = Seq(
     |   (10, "Finance", Seq(100, 200, 300, 400, 500)),
     |   (20, "IT", Seq(10, 20, 50, 100))).toDF("dept_id", "dept_nm", "emp_details")
inventory: org.apache.spark.sql.DataFrame = [dept_id: int, dept_nm: string ... 1 more field]

// I'm too lazy today for a case class
scala> inventory.as[(Long, String, Seq[Int])].
  map { case (deptId, deptName, details) => (deptId, deptName, details.sum) }.
  toDF("dept_id", "dept_nm", "sum").
  show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+
Run Code Online (Sandbox Code Playgroud)

我将切片部分作为练习,因为它同样简单.


mto*_*oto 5

explode()它可以在您的列上使用一种可能的方法Array,从而通过唯一键聚合输出。例如:

import sqlContext.implicits._
import org.apache.spark.sql.functions._

(mytable
  .withColumn("emp_sum",
    explode($"emp_details"))
  .groupBy("dept_nm")
  .agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance|        1500|
|     IT|         180|
+-------+------------+
Run Code Online (Sandbox Code Playgroud)

要仅选择数组中的特定值,我们可以使用链接问题的答案并稍作修改即可应用:

val slice = udf((array : Seq[Int], from : Int, to : Int) => array.slice(from,to))

(mytable
  .withColumn("slice", 
    slice($"emp_details", 
      lit(0), 
      lit(3)))
  .withColumn("emp_sum",
    explode($"slice"))
  .groupBy("dept_nm")
  .agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance|         600|
|     IT|          80|
+-------+------------+
Run Code Online (Sandbox Code Playgroud)

数据

val data = Seq((10, "Finance", Array(100,200,300,400,500)),
               (20, "IT", Array(10,20,50,100)))
val mytable = sc.parallelize(data).toDF("dept_id", "dept_nm","emp_details")
Run Code Online (Sandbox Code Playgroud)


use*_*411 5

从Spark 2.4开始,您可以使用以下slice功能进行切片:

import org.apache.spark.sql.functions.slice

val df = Seq(
  (10, "Finance", Seq(100, 200, 300, 400, 500)),
  (20, "IT", Seq(10, 20, 50, 100))
).toDF("dept_id", "dept_nm", "emp_details")

val dfSliced = df.withColumn(
   "emp_details_sliced",
   slice($"emp_details", 1, 3)
)

dfSliced.show(false)
Run Code Online (Sandbox Code Playgroud)
import org.apache.spark.sql.functions.slice

val df = Seq(
  (10, "Finance", Seq(100, 200, 300, 400, 500)),
  (20, "IT", Seq(10, 20, 50, 100))
).toDF("dept_id", "dept_nm", "emp_details")

val dfSliced = df.withColumn(
   "emp_details_sliced",
   slice($"emp_details", 1, 3)
)

dfSliced.show(false)
Run Code Online (Sandbox Code Playgroud)

和数组与aggregate

dfSliced.selectExpr(
  "*", 
  "aggregate(emp_details, 0, (x, y) -> x + y) as details_sum",  
  "aggregate(emp_details_sliced, 0, (x, y) -> x + y) as details_sliced_sum"
).show
Run Code Online (Sandbox Code Playgroud)
+-------+-------+-------------------------+------------------+
|dept_id|dept_nm|emp_details              |emp_details_sliced|
+-------+-------+-------------------------+------------------+
|10     |Finance|[100, 200, 300, 400, 500]|[100, 200, 300]   |
|20     |IT     |[10, 20, 50, 100]        |[10, 20, 50]      |
+-------+-------+-------------------------+------------------+
Run Code Online (Sandbox Code Playgroud)