Scala / DataFrame / Spark:如何表达多个条件聚合?

Jon*_*tte 2 scala dataframe apache-spark apache-spark-sql

假设我有一张像:

id,date,value
1,2017-02-12,3
2,2017-03-18,2
1,2017-03-20,5
1,2017-04-01,1
3,2017-04-01,3
2,2017-04-10,2
Run Code Online (Sandbox Code Playgroud)

我已经将它作为数据框(它来自 Hive 表)

现在,我想要一个看起来像(逻辑上)的输出:

id, count($"date">"2017-03"), sum($"value" where $"date">"2017-03"), count($"date">"2017-02"), sum($"value" where $"date">"2017-02")
Run Code Online (Sandbox Code Playgroud)

我试图用一个 agg() 来表达这一点,但我就是不知道如何做内部条件。我知道如何在聚合之前进行过滤,但这并不能满足我对两个不同子范围的需求。

// doesn't do the right thing
myDF.where($"date">"2017-03")
  .groupBy("id")
  .agg(sum("value") as "value_03", count("value") as "count_03")
  .where($"date">"2017-04")
  .agg(sum("value") as "value_04", count("value") as "value_04")
Run Code Online (Sandbox Code Playgroud)

在 SQL 中,我会将所有聚合放入单个 SELECT 语句中,并在count/sum子句中使用条件。我如何DataFrames使用 Scala 在 Spark 中做类似的事情?

我能想到的最接近的是计算groupBy()之前每个窗口中每个元组的成员资格,并对该成员资格乘以值(以及计数的直和)求和。似乎应该有更好的方法来用条件表达这一点里面agg(),但我找不到它。

zer*_*323 5

在 SQL 中,我会将所有聚合放入单个 SELECT 语句中,并在 count/sum 子句中使用条件。

你可以在这里做同样的事情:

import org.apache.spark.sql.functions.{sum, when}

myDF
  .groupBy($"id")
  .agg(
    sum(when($"date" > "2017-03", $"value")).alias("value3"),
    sum(when($"date" > "2017-04", $"value")).alias("value4")
  )
Run Code Online (Sandbox Code Playgroud)
import org.apache.spark.sql.functions.{sum, when}

myDF
  .groupBy($"id")
  .agg(
    sum(when($"date" > "2017-03", $"value")).alias("value3"),
    sum(when($"date" > "2017-04", $"value")).alias("value4")
  )
Run Code Online (Sandbox Code Playgroud)