yve*_*ves 2 arrays scala apache-spark apache-spark-sql
这是在Spark 2.1 中,鉴于此输入文件:
`order.json
{"id":1,"price":202.30,"userid":1}
{"id":2,"price":343.99,"userid":1}
{"id":3,"price":399.99,"userid":2}
Run Code Online (Sandbox Code Playgroud)
以及以下数据帧:
val order = sqlContext.read.json("order.json")
val df2 = order.select(struct("*") as 'order)
val df3 = df2.groupBy("order.userId").agg( collect_list( $"order").as("array"))
Run Code Online (Sandbox Code Playgroud)
df3有以下内容:
{"id":1,"price":202.30,"userid":1}
{"id":2,"price":343.99,"userid":1}
{"id":3,"price":399.99,"userid":2}
Run Code Online (Sandbox Code Playgroud)
和结构:
val order = sqlContext.read.json("order.json")
val df2 = order.select(struct("*") as 'order)
val df3 = df2.groupBy("order.userId").agg( collect_list( $"order").as("array"))
Run Code Online (Sandbox Code Playgroud)
现在假设我得到了 df3:
我想为每个 userId 计算 array.price 的总和,利用每个 userId 行的数组。
我会将此计算添加到结果数据框中的新列中。就像我已经完成了 df3.withColumn("sum", lit(0)),但是用我的计算替换了 lit(0)。
它会假设是直截了当的,但我坚持两者。我没有找到任何方法来访问整个数组来计算每行(例如使用 foldLeft)。
Spark 2.4.0及以上版本
您现在可以使用AGGREGATE功能。
df3.createOrReplaceTempView("orders")
spark.sql(
"""
|SELECT
| *,
| AGGREGATE(`array`, 0.0, (accumulator, item) -> accumulator + item.price) AS totalPrice
|FROM
| orders
|""".stripMargin).show()
Run Code Online (Sandbox Code Playgroud)
我想为每个 userId 计算 array.price 的总和,利用数组
不幸的是,这里有一个数组对你不利。Spark SQL 和DataFrameDSL都没有提供可以直接用于在任意大小的数组上处理此任务而无需先分解 ( explode) 的工具。
您可以使用 UDF:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
val totalPrice = udf((xs: Seq[Row]) => xs.map(_.getAs[Double]("price")).sum)
df3.withColumn("totalPrice", totalPrice($"array"))
Run Code Online (Sandbox Code Playgroud)
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
val totalPrice = udf((xs: Seq[Row]) => xs.map(_.getAs[Double]("price")).sum)
df3.withColumn("totalPrice", totalPrice($"array"))
Run Code Online (Sandbox Code Playgroud)
或转换为静态类型Dataset:
df3
.as[(Long, Seq[(Long, Double, Long)])]
.map{ case (id, xs) => (id, xs, xs.map(_._2).sum) }
.toDF("userId", "array", "totalPrice").show
Run Code Online (Sandbox Code Playgroud)
+------+--------------------+----------+
|userId| array|totalPrice|
+------+--------------------+----------+
| 1|[[1,202.3,1], [2,...| 546.29|
| 2| [[3,399.99,2]]| 399.99|
+------+--------------------+----------+
Run Code Online (Sandbox Code Playgroud)
如上所述,您分解和聚合:
import org.apache.spark.sql.functions.{sum, first}
df3
.withColumn("price", explode($"array.price"))
.groupBy($"userId")
.agg(sum($"price"), df3.columns.tail.map(c => first(c).alias(c)): _*)
Run Code Online (Sandbox Code Playgroud)
df3
.as[(Long, Seq[(Long, Double, Long)])]
.map{ case (id, xs) => (id, xs, xs.map(_._2).sum) }
.toDF("userId", "array", "totalPrice").show
Run Code Online (Sandbox Code Playgroud)
但它很昂贵并且不使用现有结构。
您可以使用一个丑陋的技巧:
import org.apache.spark.sql.functions.{coalesce, lit, max, size}
val totalPrice = (0 to df3.agg(max(size($"array"))).as[Int].first)
.map(i => coalesce($"array.price".getItem(i), lit(0.0)))
.foldLeft(lit(0.0))(_ + _)
df3.withColumn("totalPrice", totalPrice)
Run Code Online (Sandbox Code Playgroud)
+------+--------------------+----------+
|userId| array|totalPrice|
+------+--------------------+----------+
| 1|[[1,202.3,1], [2,...| 546.29|
| 2| [[3,399.99,2]]| 399.99|
+------+--------------------+----------+
Run Code Online (Sandbox Code Playgroud)
但这与其说是真正的解决方案,不如说是一种好奇心。