Spark DataFrame:对组进行操作

Ken*_*ams 7 grouping scala dataframe apache-spark

我有一个我正在操作的DataFrame,我希望按一组列进行分组,并在其余列上按组操作.在普通RDD地区,我认为它看起来像这样:

rdd.map( tup => ((tup._1, tup._2, tup._3), tup) ).
  groupByKey().
  forEachPartition( iter => doSomeJob(iter) )
Run Code Online (Sandbox Code Playgroud)

DataFrame陆地我会这样开始:

df.groupBy("col1", "col2", "col3")  // Reference by name
Run Code Online (Sandbox Code Playgroud)

但是如果我的操作比GroupedData提供的平均/最小/最大/计数更复杂,那么我不确定如何操作组.

例如,我想为每个("col1", "col2", "col3")组构建一个MongoDB文档(通过遍历组中的关联Rows),缩小到N分区,然后将文档插入MongoDB数据库.该N限制是同时连接我想的最多数量.

有什么建议?

Dav*_*fin 1

您可以自行加入。首先获取组:

val groups = df.groupBy($"col1", $"col2", $"col3").agg($"col1", $"col2", $"col3")
Run Code Online (Sandbox Code Playgroud)

然后你可以将其连接回原始 DataFrame:

val joinedDF = groups
  .select($"col1" as "l_col1", $"col2" as "l_col2", $"col3" as "l_col3)
  .join(df, $"col1" <=> $"l_col1" and $"col2" <=> $"l_col2" and  $"col3" <=> $"l_col3")
Run Code Online (Sandbox Code Playgroud)

虽然这会为您提供与原来完全相同的数据(并且带有 3 个额外的冗余列),但您可以执行另一个联接,为与该行关联的 (col1、col2、col3) 组添加具有 MongoDB 文档 ID 的列。

无论如何,根据我的经验,连接和自连接是处理 DataFrame 中复杂内容的方式。