Jon*_*tte 2 scala dataframe apache-spark apache-spark-sql
假设我有一张像:
id,date,value
1,2017-02-12,3
2,2017-03-18,2
1,2017-03-20,5
1,2017-04-01,1
3,2017-04-01,3
2,2017-04-10,2
Run Code Online (Sandbox Code Playgroud)
我已经将它作为数据框(它来自 Hive 表)
现在,我想要一个看起来像(逻辑上)的输出:
id, count($"date">"2017-03"), sum($"value" where $"date">"2017-03"), count($"date">"2017-02"), sum($"value" where $"date">"2017-02")
Run Code Online (Sandbox Code Playgroud)
我试图用一个 agg() 来表达这一点,但我就是不知道如何做内部条件。我知道如何在聚合之前进行过滤,但这并不能满足我对两个不同子范围的需求。
// doesn't do the right thing
myDF.where($"date">"2017-03")
.groupBy("id")
.agg(sum("value") as "value_03", count("value") as "count_03")
.where($"date">"2017-04")
.agg(sum("value") as "value_04", count("value") as "value_04")
Run Code Online (Sandbox Code Playgroud)
在 SQL 中,我会将所有聚合放入单个 SELECT 语句中,并在count
/sum
子句中使用条件。我如何DataFrames
使用 Scala 在 Spark 中做类似的事情?
我能想到的最接近的是计算groupBy(
)之前每个窗口中每个元组的成员资格,并对该成员资格乘以值(以及计数的直和)求和。似乎应该有更好的方法来用条件表达这一点里面agg()
,但我找不到它。
在 SQL 中,我会将所有聚合放入单个 SELECT 语句中,并在 count/sum 子句中使用条件。
你可以在这里做同样的事情:
import org.apache.spark.sql.functions.{sum, when}
myDF
.groupBy($"id")
.agg(
sum(when($"date" > "2017-03", $"value")).alias("value3"),
sum(when($"date" > "2017-04", $"value")).alias("value4")
)
Run Code Online (Sandbox Code Playgroud)
import org.apache.spark.sql.functions.{sum, when}
myDF
.groupBy($"id")
.agg(
sum(when($"date" > "2017-03", $"value")).alias("value3"),
sum(when($"date" > "2017-04", $"value")).alias("value4")
)
Run Code Online (Sandbox Code Playgroud)