Ken*_*ams 7 grouping scala dataframe apache-spark
我有一个我正在操作的DataFrame,我希望按一组列进行分组,并在其余列上按组操作.在普通RDD地区,我认为它看起来像这样:
rdd.map( tup => ((tup._1, tup._2, tup._3), tup) ).
groupByKey().
forEachPartition( iter => doSomeJob(iter) )
Run Code Online (Sandbox Code Playgroud)
在DataFrame陆地我会这样开始:
df.groupBy("col1", "col2", "col3") // Reference by name
Run Code Online (Sandbox Code Playgroud)
但是如果我的操作比GroupedData提供的平均/最小/最大/计数更复杂,那么我不确定如何操作组.
例如,我想为每个("col1", "col2", "col3")组构建一个MongoDB文档(通过遍历组中的关联Rows),缩小到N分区,然后将文档插入MongoDB数据库.该N限制是同时连接我想的最多数量.
有什么建议?
您可以自行加入。首先获取组:
val groups = df.groupBy($"col1", $"col2", $"col3").agg($"col1", $"col2", $"col3")
Run Code Online (Sandbox Code Playgroud)
然后你可以将其连接回原始 DataFrame:
val joinedDF = groups
.select($"col1" as "l_col1", $"col2" as "l_col2", $"col3" as "l_col3)
.join(df, $"col1" <=> $"l_col1" and $"col2" <=> $"l_col2" and $"col3" <=> $"l_col3")
Run Code Online (Sandbox Code Playgroud)
虽然这会为您提供与原来完全相同的数据(并且带有 3 个额外的冗余列),但您可以执行另一个联接,为与该行关联的 (col1、col2、col3) 组添加具有 MongoDB 文档 ID 的列。
无论如何,根据我的经验,连接和自连接是处理 DataFrame 中复杂内容的方式。
| 归档时间: |
|
| 查看次数: |
6404 次 |
| 最近记录: |