Igo*_*her 6 dataframe apache-spark apache-spark-sql pyspark databricks
我想分组聚合一个 pyspark 数据框,同时根据该数据框的另一列删除重复项(保留最后一个值)。
总之,我想将 dropDuplicates 应用于 GroupedData 对象。因此,对于每个组,我只能动态地保留某一列的一行。
对于下面的数据帧,直接的组聚合将是:
from pyspark.sql import functions
dataframe = spark.createDataFrame(
[
(1, "2020-01-01", 1, 1),
(2, "2020-01-01", 2, 1),
(3, "2020-01-02", 1, 1),
(2, "2020-01-02", 1, 1)
],
("id", "ts", "feature", "h3")
).withColumn("ts", functions.col("ts").cast("timestamp"))
# +---+-------------------+-------+---+
# | id| ts|feature| h3|
# +---+-------------------+-------+---+
# | 1|2020-01-01 00:00:00| 1| 1|
# | 2|2020-01-01 00:00:00| 2| 1|
# | 3|2020-01-02 00:00:00| 1| 1|
# | 2|2020-01-02 00:00:00| 1| 1|
# +---+-------------------+-------+---+
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.sum("feature")
)
aggregated.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
导致以下数据帧:
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|5 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|5 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
Run Code Online (Sandbox Code Playgroud)
我想聚合只使用最新的状态每个id。在这种情况下,id=2已更新为feature=1at ts=2020-01-02 00:00:00,因此所有基本时间戳大于 的聚合2020-01-02 00:00:00都应仅在 时为列特征使用此状态id=2。预期的聚合数据框是:
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|3 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
Run Code Online (Sandbox Code Playgroud)
我怎样才能用 pyspark 做到这一点?
我假设 MapType 变量在 Spark 中不应该有重复的键。有了这个假设,我想我可以聚合创建地图的列id -> feature,然后只用 sum (或最终聚合应该是什么)聚合地图值。
所以我做了:
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|5 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|5 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
Run Code Online (Sandbox Code Playgroud)
但后来我发现地图可以有重复的键:
+---+------------------------------------------+--------------------------------+
|h3 |window |id_feature |
+---+------------------------------------------+--------------------------------+
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|[1 -> 1, 2 -> 2] |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|[3 -> 1, 2 -> 1] |
+---+------------------------------------------+--------------------------------+
Run Code Online (Sandbox Code Playgroud)
所以它不能解决我的问题。相反,我只是发现了另一个问题。在 Databricks 的笔记本中使用 display 功能时,它显示 MapType 列而没有重复的键。
首先,您可以找到每个 id 和时间窗口的最新记录,然后将最新记录与原始数据帧连接起来。
time_window = window(timeColumn="ts", windowDuration="3 days", slideDuration="1 day")
df2 = df.groupBy("h3", time_window, "id").agg(max("ts").alias("latest"))
df2.alias("a").join(df.alias("b"), (col("a.id") == col("b.id")) & (col("a.latest") == col("b.ts")), "left") \
.select("a.*", "feature") \
.groupBy("h3", "window") \
.agg(sum("feature")) \
.orderBy("window") \
.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
然后,结果就和你预想的一样了。
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-29 00:00:00, 2020-01-01 00:00:00]|3 |
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|2 |
+---+------------------------------------------+------------+
Run Code Online (Sandbox Code Playgroud)