Mar*_*ról 7 sql aggregate apache-spark apache-spark-sql spark-dataframe
通常,组中的所有行都将传递给聚合函数.我想使用条件过滤行,以便只将组中的某些行传递给聚合函数.PostgreSQL可以实现这样的操作.我想用Spark SQL DataFrame(Spark 2.0.0)做同样的事情.
代码可能看起来像这样:
val df = ... // some data frame
df.groupBy("A").agg(
max("B").where("B").less(10), // there is no such method as `where` :(
max("C").where("C").less(5)
)
Run Code Online (Sandbox Code Playgroud)
所以对于像这样的数据框:
| A | B | C |
| 1| 14| 4|
| 1| 9| 3|
| 2| 5| 6|
Run Code Online (Sandbox Code Playgroud)
结果将是:
|A|max(B)|max(C)|
|1| 9| 4|
|2| 5| null|
Run Code Online (Sandbox Code Playgroud)
是否可以使用Spark SQL?
请注意,通常max可以使用任何其他聚合函数,并且在具有任意过滤条件的同一列上可能存在多个聚合.
use*_*459 13
val df = Seq(
(1,14,4),
(1,9,3),
(2,5,6)
).toDF("a","b","c")
val aggregatedDF = df.groupBy("a")
.agg(
max(when($"b" < 10, $"b")).as("MaxB"),
max(when($"c" < 5, $"c")).as("MaxC")
)
aggregatedDF.show
Run Code Online (Sandbox Code Playgroud)
小智 0
>>> df = sc.parallelize([[1,14,1],[1,9,3],[2,5,6]]).map(lambda t: Row(a=int(t[0]),b=int(t[1]),c=int(t[2]))).toDF()
>>> df.registerTempTable('t')
>>> res = sqlContext.sql("select a,max(case when b<10 then b else null end) mb,max(case when c<5 then c else null end) mc from t group by a")
+---+---+----+
| a| mb| mc|
+---+---+----+
| 1| 9| 3|
| 2| 5|null|
+---+---+----+
Run Code Online (Sandbox Code Playgroud)
你可以使用sql(我相信你在Postgres中做了同样的事情?)
| 归档时间: |
|
| 查看次数: |
7818 次 |
| 最近记录: |