使用 groupby spark 数据帧中的条件聚合

exp*_*ent 0 scala apache-spark apache-spark-sql

我有一个数据框

id lat long lag_lat lag_long detector lag_interval  gpsdt  lead_gpsdt
  1  12   13    12       13        1        [1.5,3.5]  4      4.5
  1  12   13    12       13        1        null       4.5    5
  1  12   13    12       13        1        null       5      5.5
  1  12   13    12       13        1        null       5.5    6
  1  13   14    12       13        2        null       6      6.5
  1  13   14    13       14        2        null       6.5    null
  2  13   14    13       14        2        [0.5,1.5]  2.5    3.5  
  2  13   14    13       14        2        null       3.5    4 
  2  13   14    13       14        2        null       4      null
Run Code Online (Sandbox Code Playgroud)

所以我想在 agg 函数中使用 groupby 时应用一个条件,如果我们执行 groupby col("id") 和 col("detector") 那么我想检查条件,如果该组中的 lag_interval 有任何非空值然后在聚合中我想要两列,其中一列是

 min("lag_interval.col1") and other is max("lead_gpsdt") 
Run Code Online (Sandbox Code Playgroud)

如果不满足上述条件,那么我想要

min("gpsdt"), max("lead_gpsdt")
Run Code Online (Sandbox Code Playgroud)

使用这种方法我想获取有条件的数据

df.groupBy("detector","id").agg(first("lat-long").alias("start_coordinate"),
    last("lat-long").alias("end_coordinate"),struct(min("gpsdt"), max("lead_gpsdt")).as("interval"))
Run Code Online (Sandbox Code Playgroud)

输出

  id interval  start_coordinate end_coordinate
  1   [1.5,6]      [12,13]         [13,14] 
  1   [6,6.5]      [13,14]         [13,14]
  2   [0.5,4]      [13,14]         [13,14]
Run Code Online (Sandbox Code Playgroud)

**

更多解释

** 如果我们看到 groupby("id","detector") 所做的一部分正在取出一部分,

我们必须看到,如果在该组数据中,如果 col("lag_interval") 中的值之一不为空,那么我们需要使用这样的聚合min(lag_interval.col1),max(lead_gpsdt) 这个条件将适用到以下数据集

id lat long lag_lat lag_long detector lag_interval  gpsdt  lead_gpsdt
 1  12   13    12       13        1        [1.5,3.5]  4      4.5
 1  12   13    12       13        1        null       4.5    5
 1  12   13    12       13        1        null       5      5.5
 1  12   13    12       13        1        null       5.5    6
Run Code Online (Sandbox Code Playgroud)

如果该组数据中 col("lag_interval") 的所有值都为空,那么我们需要聚合输出为 min("gpsdt"),max("lead_gpsdt") 此条件将适用于以下数据集

id lat long lag_lat lag_long detector lag_interval  gpsdt  lead_gpsdt
 1  13   14    12       13        2        null       6      6.5
 1  13   14    13       14        2        null       6.5    null
Run Code Online (Sandbox Code Playgroud)

Ram*_*jan 5

您应该使用以下建议的简单when 内置函数来解决您遇到的条件困境

import org.apache.spark.sql.functions._
df.groupBy("id","detector")
  .agg(
    struct(
      when(isnull(min("lag_interval.col1")), min("gpsdt")).otherwise(min("lag_interval.col1")).as("min"),
      max("lead_gpsdt").as(("max"))
    ).as("interval")
  )
Run Code Online (Sandbox Code Playgroud)

这应该给你输出

+---+--------+----------+
|id |detector|interval  |
+---+--------+----------+
|2  |2       |[0.5, 4.0]|
|1  |2       |[6.0, 6.5]|
|1  |1       |[1.5, 6.0]|
+---+--------+----------+
Run Code Online (Sandbox Code Playgroud)

我想你一定已经知道如何做,first("lat-long").alias("start_coordinate"), last("lat-long").alias("end_coordinate")就像你所做的那样。

我希望答案有帮助