如何使用spark sql过滤特定聚合的行?

Mar*_*ról 7 sql aggregate apache-spark apache-spark-sql spark-dataframe

通常,组中的所有行都将传递给聚合函数.我想使用条件过滤行,以便只将组中的某些行传递给聚合函数.PostgreSQL可以实现这样的操作.我想用Spark SQL DataFrame(Spark 2.0.0)做同样的事情.

代码可能看起来像这样:

val df = ... // some data frame
df.groupBy("A").agg(
  max("B").where("B").less(10), // there is no such method as `where` :(
  max("C").where("C").less(5)
)
Run Code Online (Sandbox Code Playgroud)

所以对于像这样的数据框:

| A | B | C |
|  1| 14|  4|
|  1|  9|  3|
|  2|  5|  6|
Run Code Online (Sandbox Code Playgroud)

结果将是:

|A|max(B)|max(C)|
|1|    9|      4|
|2|    5|   null|
Run Code Online (Sandbox Code Playgroud)

是否可以使用Spark SQL?

请注意,通常max可以使用任何其他聚合函数,并且在具有任意过滤条件的同一列上可能存在多个聚合.

use*_*459 13

val df = Seq(
    (1,14,4),
    (1,9,3),
    (2,5,6)
  ).toDF("a","b","c")

val aggregatedDF = df.groupBy("a")
  .agg(
    max(when($"b" < 10, $"b")).as("MaxB"),
    max(when($"c" < 5, $"c")).as("MaxC")
  )

aggregatedDF.show
Run Code Online (Sandbox Code Playgroud)

  • 如果你解释一下你在这里做什么会很好 (4认同)
  • 请注意,当列 b &gt;= 10 时,when($"b" &lt; 10, $"b") 会产生空值。如果 b 永远不会小于 10,则输出aggregateDF 中会产生空值。如果您希望在这种情况下为零,您可以将when($"b" &lt; 10, $"b")替换为when($"b" &lt; 10, $"b").otherwise(0) (2认同)

小智 0

    >>> df = sc.parallelize([[1,14,1],[1,9,3],[2,5,6]]).map(lambda t: Row(a=int(t[0]),b=int(t[1]),c=int(t[2]))).toDF()
    >>> df.registerTempTable('t')
   >>> res = sqlContext.sql("select a,max(case when b<10 then b else null end) mb,max(case when c<5 then c else null end) mc from t group by a")

    +---+---+----+
    |  a| mb|  mc|
    +---+---+----+
    |  1|  9|   3|
    |  2|  5|null|
    +---+---+----+
Run Code Online (Sandbox Code Playgroud)

你可以使用sql(我相信你在Postgres中做了同样的事情?)