Jre*_*her 5 cross-join dataframe apache-spark apache-spark-sql pyspark
在这种情况下,我需要帮助来用新行填充缺失值:
这只是一个例子,但我有很多行具有不同的IDs.
输入数据框:
| ID | 旗帜 | 日期 |
|---|---|---|
| 123 | 1 | 01/01/2021 |
| 123 | 0 | 2021年1月2日 |
| 123 | 1 | 2021年1月3日 |
| 123 | 0 | 2021年1月6日 |
| 123 | 0 | 2021年1月8日 |
| 第777章 | 0 | 01/01/2021 |
| 第777章 | 1 | 2021年1月3日 |
所以我有一组有限的dates,我想直到每个的最后一个ID(在示例中,对于ID = 123:01/01/2021、01/02/2021、01/03/2021...直到01/08/2021 )。所以基本上我可以与日历进行交叉联接,但我不知道在交叉联接之后如何使用规则或过滤器填充缺失值。
预期输出:(以粗体显示生成的缺失值)
| ID | 旗帜 | 日期 |
|---|---|---|
| 123 | 1 | 01/01/2021 |
| 123 | 0 | 2021年1月2日 |
| 123 | 1 | 2021年1月3日 |
| 123 | 1 | 01/04/2021 |
| 123 | 1 | 01/05/2021 |
| 123 | 0 | 2021年1月6日 |
| 123 | 0 | 2021年1月7日 |
| 123 | 0 | 2021年1月8日 |
| 第777章 | 0 | 01/01/2021 |
| 第777章 | 0 | 2021年1月2日 |
| 第777章 | 1 | 2021年1月3日 |
bla*_*hop 11
您可以首先进行分组来id计算最大值和最小值date,然后使用函数生成从到 的sequence所有日期。最后,与原始数据帧连接并用每组的最后一个非空值填充空值。这是一个完整的工作示例:min_datemax_dateid
您的输入数据框:
from pyspark.sql import Window
import pyspark.sql.functions as F
df = spark.createDataFrame([
(123, 1, "01/01/2021"), (123, 0, "01/02/2021"),
(123, 1, "01/03/2021"), (123, 0, "01/06/2021"),
(123, 0, "01/08/2021"), (777, 0, "01/01/2021"),
(777, 1, "01/03/2021")
], ["id", "flag", "date"])
Run Code Online (Sandbox Code Playgroud)
Groupbyid并为每个生成所有可能的日期id:
all_dates_df = df.groupBy("id").agg(
F.date_trunc("mm", F.max(F.to_date("date", "dd/MM/yyyy"))).alias("max_date"),
F.date_trunc("mm", F.min(F.to_date("date", "dd/MM/yyyy"))).alias("min_date")
).select(
"id",
F.expr("sequence(min_date, max_date, interval 1 month)").alias("date")
).withColumn(
"date", F.explode("date")
).withColumn(
"date",
F.date_format("date", "dd/MM/yyyy")
)
Run Code Online (Sandbox Code Playgroud)
现在,左连接并在按 分区的窗口上df使用函数来填充空值:lastid
w = Window.partitionBy("id").orderBy("date")
result = all_dates_df.join(df, ["id", "date"], "left").select(
"id",
"date",
*[F.last(F.col(c), ignorenulls=True).over(w).alias(c)
for c in df.columns if c not in ("id", "date")
]
)
result.show()
#+---+----------+----+
#| id| date|flag|
#+---+----------+----+
#|123|01/01/2021| 1|
#|123|01/02/2021| 0|
#|123|01/03/2021| 1|
#|123|01/04/2021| 1|
#|123|01/05/2021| 1|
#|123|01/06/2021| 0|
#|123|01/07/2021| 0|
#|123|01/08/2021| 0|
#|777|01/01/2021| 0|
#|777|01/02/2021| 0|
#|777|01/03/2021| 1|
#+---+----------+----+
Run Code Online (Sandbox Code Playgroud)
DATE您可以找到当前行和下一行中的值之间的日期范围,然后sequence用于生成所有中间日期并分解此数组以填充缺失日期的值。
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(123, 1, "01/01/2021",),
(123, 0, "01/02/2021",),
(123, 1, "01/03/2021",),
(123, 0, "01/06/2021",),
(123, 0, "01/08/2021",),
(777, 0, "01/01/2021",),
(777, 1, "01/03/2021",), ]
df = spark.createDataFrame(data, ("ID", "FLAG", "DATE",)).withColumn("DATE", F.to_date(F.col("DATE"), "dd/MM/yyyy"))
window_spec = Window.partitionBy("ID").orderBy("DATE")
next_date = F.coalesce(F.lead("DATE", 1).over(window_spec), F.col("DATE") + F.expr("interval 1 month"))
end_date_range = next_date - F.expr("interval 1 month")
df.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 month")))\
.withColumn("DATE", F.explode("Ranges"))\
.withColumn("DATE", F.date_format("date", "dd/MM/yyyy"))\
.drop("Ranges").show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
+---+----+----------+
|ID |FLAG|DATE |
+---+----+----------+
|123|1 |01/01/2021|
|123|0 |01/02/2021|
|123|1 |01/03/2021|
|123|1 |01/04/2021|
|123|1 |01/05/2021|
|123|0 |01/06/2021|
|123|0 |01/07/2021|
|123|0 |01/08/2021|
|777|0 |01/01/2021|
|777|0 |01/02/2021|
|777|1 |01/03/2021|
+---+----+----------+
Run Code Online (Sandbox Code Playgroud)