d80*_*tb7 5 scala apache-spark apache-spark-sql apache-spark-dataset
假设我有一个这样的数据结构,其中ts是一些时间戳
case class Record(ts: Long, id: Int, value: Int)
Run Code Online (Sandbox Code Playgroud)
鉴于大量这些记录,我希望最终得到每个id具有最高时间戳的记录.使用RDD api我认为以下代码完成了工作:
def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
records.keyBy(_.id).reduceByKey{
(x, y) => if(x.ts > y.ts) x else y
}.values
}
Run Code Online (Sandbox Code Playgroud)
同样,这是我对数据集的尝试:
def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
records.groupByKey(_.id).mapGroups{
case(id, records) => {
records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
}
}
}
Run Code Online (Sandbox Code Playgroud)
我正在尝试研究如何使用数据框来实现类似的东西,但无济于事 - 我意识到我可以使用以下方法进行分组:
records.groupBy($"id")
Run Code Online (Sandbox Code Playgroud)
但是这给了我一个RelationGroupedDataSet,我不清楚我需要编写什么聚合函数来实现我想要的东西 - 我看到的所有示例聚合似乎都只关注返回一个聚合而不是整行的列.
是否可以使用数据框来实现这一目标?
Ass*_*son 10
您可以使用argmax逻辑(参见数据库示例)
例如,假设您的数据框名为df,并且它具有列id,val,ts,您可以执行以下操作:
import org.apache.spark.sql.functions._
val newDF = df.groupBy('id).agg.max(struct('ts, 'val)) as 'tmp).select($"id", $"tmp.*")
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
10828 次 |
最近记录: |