Eri*_*son 48 scala apache-spark apache-spark-sql
我有一个带有架构的数据帧:
[visitorId: string, trackingIds: array<string>, emailIds: array<string>]
Run Code Online (Sandbox Code Playgroud)
正在寻找一种方法来分组(或者可能汇总?)由visitorid组成的数据帧,其中的trackingIds和emailIds列将一起追加.所以例如,如果我的初始df看起来像:
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b] | [12]
|7g21| [c0b5] | [45]
|7g21| [c0b4] | [87]
|a158| [666b, 777c]| []
Run Code Online (Sandbox Code Playgroud)
我希望我的输出df看起来像这样
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b,666b,777c]| [12,'']
|7g21| [c0b5,c0b4] | [45, 87]
Run Code Online (Sandbox Code Playgroud)
试图使用groupBy和agg运营商但没有太多运气.
zer*_*323 38
Spark> = 2.4
您可以flatten udf使用内置功能进行更换flatten
import org.apache.spark.sql.functions.flatten
Run Code Online (Sandbox Code Playgroud)
剩下的就是原样.
Spark> = 2.0,<2.4
它可能但非常昂贵.使用您提供的数据:
case class Record(
visitorId: String, trackingIds: Array[String], emailIds: Array[String])
val df = Seq(
Record("a158", Array("666b"), Array("12")),
Record("7g21", Array("c0b5"), Array("45")),
Record("7g21", Array("c0b4"), Array("87")),
Record("a158", Array("666b", "777c"), Array.empty[String])).toDF
Run Code Online (Sandbox Code Playgroud)
和辅助函数:
import org.apache.spark.sql.functions.udf
val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)
Run Code Online (Sandbox Code Playgroud)
我们可以用占位符填充空白:
import org.apache.spark.sql.functions.{array, lit, when}
val dfWithPlaceholders = df.withColumn(
"emailIds",
when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))
Run Code Online (Sandbox Code Playgroud)
collect_lists并且flatten:
import org.apache.spark.sql.functions.{array, collect_list}
val emailIds = flatten(collect_list($"emailIds")).alias("emailIds")
val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")
df
.groupBy($"visitorId")
.agg(trackingIds, emailIds)
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | a158|[666b, 666b, 777c]| [12, ]|
// | 7g21| [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+
Run Code Online (Sandbox Code Playgroud)
使用静态类型Dataset:
df.as[Record]
.groupByKey(_.visitorId)
.mapGroups { case (key, vs) =>
vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {
case (trackingIds, emailIds) =>
Record(key, trackingIds.flatten, emailIds.flatten)
}}
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | a158|[666b, 666b, 777c]| [12, ]|
// | 7g21| [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+
Run Code Online (Sandbox Code Playgroud)
Spark 1.x
您可以转换为RDD和组
import org.apache.spark.sql.Row
dfWithPlaceholders.rdd
.map {
case Row(id: String,
trcks: Seq[String @ unchecked],
emails: Seq[String @ unchecked]) => (id, (trcks, emails))
}
.groupByKey
.map {case (key, vs) => vs.toArray.unzip match {
case (trackingIds, emailIds) =>
Record(key, trackingIds.flatten, emailIds.flatten)
}}
.toDF
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | 7g21| [c0b5, c0b4]|[45, 87]|
// | a158|[666b, 666b, 777c]| [12, ]|
// +---------+------------------+--------+
Run Code Online (Sandbox Code Playgroud)
Jac*_*ski 20
@ zero323的答案非常完整,但Spark为我们提供了更大的灵活性.以下解决方案怎么样?
import org.apache.spark.sql.functions._
inventory
.select($"*", explode($"trackingIds") as "tracking_id")
.select($"*", explode($"emailIds") as "email_id")
.groupBy("visitorId")
.agg(
collect_list("tracking_id") as "trackingIds",
collect_list("email_id") as "emailIds")
Run Code Online (Sandbox Code Playgroud)
然而,这留下了所有空集合(所以有一些改进的余地:))
| 归档时间: |
|
| 查看次数: |
48109 次 |
| 最近记录: |