根据spark中的日期范围填充数据

war*_*ner 2 scala apache-spark pyspark

我有示例数据集,我想根据开始日期和结束日期(从 2016-01-01 到 2016-01-08)用 0 填充日期。

id,date,quantity
1,2016-01-03,10
1,2016-01-04,20
1,2016-01-06,30
1,2016-01-07,20
2,2016-01-02,10
2,2016-01-03,10
2,2016-01-04,20
2,2016-01-06,20
2,2016-01-07,20
Run Code Online (Sandbox Code Playgroud)

根据下面链接的解决方案,我能够实现部分解决方案。 填充 Spark 数据框列中缺失的日期

有人可以建议如何填写从开始日期到结束日期的日期,甚至从开始日期到结束日期。

id,date,quantity
1,2016-01-01,0
1,2016-01-02,0
1,2016-01-03,10
1,2016-01-04,20
1,2016-01-05,0
1,2016-01-06,30
1,2016-01-07,20
1,2016-01-08,0
2,2016-01-01,0
2,2016-01-02,10
2,2016-01-03,10
2,2016-01-04,20
2,2016-01-05,0
2,2016-01-06,20
2,2016-01-07,20
2,2016-01-08,0
Run Code Online (Sandbox Code Playgroud)

Shu*_*Shu 5

Spark-2.4开始,使用sequence函数生成 中的所有日期2016-01-01 to 2016-01--08

  • 然后连接到原始数据框coalesce以获取quantity and id值。

Example:

df1=sql("select explode(sequence(date('2016-01-01'),date('2016-01-08'),INTERVAL 1 DAY)) as date").\
withColumn("quantity",lit(0)).\
withColumn("id",lit(1))

df1.show()
#+----------+--------+---+
#|      date|quantity| id|
#+----------+--------+---+
#|2016-01-01|       0|  1|
#|2016-01-02|       0|  1|
#|2016-01-03|       0|  1|
#|2016-01-04|       0|  1|
#|2016-01-05|       0|  1|
#|2016-01-06|       0|  1|
#|2016-01-07|       0|  1|
#|2016-01-08|       0|  1|
#+----------+--------+---+

df.show()
#+---+----------+--------+
#| id|      date|quantity|
#+---+----------+--------+
#|  1|2016-01-03|      10|
#|  1|2016-01-04|      20|
#|  1|2016-01-06|      30|
#|  1|2016-01-07|      20|
#+---+----------+--------+


from pyspark.sql.functions import *
from pyspark.sql.types import *

exprs=['date']+[coalesce(col('df.'f'{f}'),col('df1.'f'{f}')).alias(f) for f in df1.columns if f not in ['date']]

df1.\
alias("df1").\
join(df.alias("df"),['date'],'left').\
select(*exprs).\
orderBy("date").\
show()

#+----------+--------+---+
#|      date|quantity| id|
#+----------+--------+---+
#|2016-01-01|       0|  1|
#|2016-01-02|       0|  1|
#|2016-01-03|      10|  1|
#|2016-01-04|      20|  1|
#|2016-01-05|       0|  1|
#|2016-01-06|      30|  1|
#|2016-01-07|      20|  1|
#|2016-01-08|       0|  1|
#+----------+--------+---+
Run Code Online (Sandbox Code Playgroud)

Update:

df=spark.createDataFrame([(1,'2016-01-03',10),(1,'2016-01-04',20),(1,'2016-01-06',30),(1,'2016-01-07',20),(2,'2016-01-02',10),(2,'2016-01-03',10),(2,'2016-01-04',20),(2,'2016-01-06',20),(2,'2016-01-07',20)],["id","date","quantity"])

df1=df.selectExpr("id").distinct().selectExpr("id","explode(sequence(date('2016-01-01'),date('2016-01-08'),INTERVAL 1 DAY)) as date").withColumn("quantity",lit(0))

from pyspark.sql.functions import *
from pyspark.sql.types import *

exprs=[coalesce(col('df.'f'{f}'),col('df1.'f'{f}')).alias(f) for f in df1.columns]


df2=df1.alias("df1").join(df.alias("df"),(col("df1.date") == col("df.date"))& (col("df1.id") == col("df.id")),'left').select(*exprs)

df2.orderBy("id","date").show()
#+---+----------+--------+
#| id|      date|quantity|
#+---+----------+--------+
#|  1|2016-01-01|       0|
#|  1|2016-01-02|       0|
#|  1|2016-01-03|      10|
#|  1|2016-01-04|      20|
#|  1|2016-01-05|       0|
#|  1|2016-01-06|      30|
#|  1|2016-01-07|      20|
#|  1|2016-01-08|       0|
#|  2|2016-01-01|       0|
#|  2|2016-01-02|      10|
#|  2|2016-01-03|      10|
#|  2|2016-01-04|      20|
#|  2|2016-01-05|       0|
#|  2|2016-01-06|      20|
#|  2|2016-01-07|      20|
#|  2|2016-01-08|       0|
#+---+----------+--------+
Run Code Online (Sandbox Code Playgroud)