在Spark中使用groupBy并返回DataFrame

Dea*_*an 5 scala apache-spark apache-spark-sql

使用Scala处理spark中的数据帧时遇到了困难.如果我有一个数据框,我想提取一列唯一条目,当我使用时,groupBy我没有得到一个数据帧.

例如,我有一个DataFrame具有以下形式的被调用日志:

machine_id  | event     | other_stuff
 34131231   | thing     |   stuff
 83423984   | notathing | notstuff
 34131231   | thing    | morestuff
Run Code Online (Sandbox Code Playgroud)

我想要一个独特的机器ID,其中事件是存储在新的东西DataFrame,允许我做某种过滤.运用

val machineId = logs
  .where($"event" === "thing")
  .select("machine_id")
  .groupBy("machine_id")
Run Code Online (Sandbox Code Playgroud)

我得到了一个Grouped Data的val,这是一个痛苦的使用(或我不知道如何正确使用这种对象).得到这个唯一机器ID的列表后,我想用它来过滤另一个DataFrame机器ID以提取单个机器ID的所有事件.

我可以看到我会定期做这类事情,基本的工作流程是:

  1. 从日志表中提取唯一ID.
  2. 使用唯一ID来提取特定ID的所有事件.
  3. 对已提取的数据进行某种分析.

这是前两个步骤,我将在这里感谢一些指导.

我很欣赏这个例子有点做作,但希望它解释了我的问题.可能是我对GroupedData对象不够了解或者(因为我希望)我在数据框中遗漏了一些东西,这使得这很容易.我正在使用基于Scala 2.10.4构建的spark 1.5.

谢谢

zer*_*323 7

只是使用distinctgroupBy:

val machineId = logs.where($"event"==="thing").select("machine_id").distinct
Run Code Online (Sandbox Code Playgroud)

这将等同于SQL:

SELECT DISTINCT machine_id FROM logs WHERE event = 'thing'
Run Code Online (Sandbox Code Playgroud)

GroupedData不打算直接使用.它提供了许多方法,其中agg最常用,可用于应用不同的聚合函数并将其转换回DataFrame.在SQL方面你以后有什么wheregroupBy相当于这样的事情

SELECT machine_id, ... FROM logs WHERE event = 'thing' GROUP BY machine_id
Run Code Online (Sandbox Code Playgroud)

其中...必须通过提供agg或等效方法.