war*_*ner 2 scala apache-spark pyspark
我有示例数据集,我想根据开始日期和结束日期(从 2016-01-01 到 2016-01-08)用 0 填充日期。
id,date,quantity
1,2016-01-03,10
1,2016-01-04,20
1,2016-01-06,30
1,2016-01-07,20
2,2016-01-02,10
2,2016-01-03,10
2,2016-01-04,20
2,2016-01-06,20
2,2016-01-07,20
Run Code Online (Sandbox Code Playgroud)
根据下面链接的解决方案,我能够实现部分解决方案。 填充 Spark 数据框列中缺失的日期
有人可以建议如何填写从开始日期到结束日期的日期,甚至从开始日期到结束日期。
id,date,quantity
1,2016-01-01,0
1,2016-01-02,0
1,2016-01-03,10
1,2016-01-04,20
1,2016-01-05,0
1,2016-01-06,30
1,2016-01-07,20
1,2016-01-08,0
2,2016-01-01,0
2,2016-01-02,10
2,2016-01-03,10
2,2016-01-04,20
2,2016-01-05,0
2,2016-01-06,20
2,2016-01-07,20
2,2016-01-08,0
Run Code Online (Sandbox Code Playgroud)
从Spark-2.4开始,使用sequence函数生成 中的所有日期2016-01-01 to 2016-01--08。
coalesce以获取quantity and id值。Example:
df1=sql("select explode(sequence(date('2016-01-01'),date('2016-01-08'),INTERVAL 1 DAY)) as date").\
withColumn("quantity",lit(0)).\
withColumn("id",lit(1))
df1.show()
#+----------+--------+---+
#| date|quantity| id|
#+----------+--------+---+
#|2016-01-01| 0| 1|
#|2016-01-02| 0| 1|
#|2016-01-03| 0| 1|
#|2016-01-04| 0| 1|
#|2016-01-05| 0| 1|
#|2016-01-06| 0| 1|
#|2016-01-07| 0| 1|
#|2016-01-08| 0| 1|
#+----------+--------+---+
df.show()
#+---+----------+--------+
#| id| date|quantity|
#+---+----------+--------+
#| 1|2016-01-03| 10|
#| 1|2016-01-04| 20|
#| 1|2016-01-06| 30|
#| 1|2016-01-07| 20|
#+---+----------+--------+
from pyspark.sql.functions import *
from pyspark.sql.types import *
exprs=['date']+[coalesce(col('df.'f'{f}'),col('df1.'f'{f}')).alias(f) for f in df1.columns if f not in ['date']]
df1.\
alias("df1").\
join(df.alias("df"),['date'],'left').\
select(*exprs).\
orderBy("date").\
show()
#+----------+--------+---+
#| date|quantity| id|
#+----------+--------+---+
#|2016-01-01| 0| 1|
#|2016-01-02| 0| 1|
#|2016-01-03| 10| 1|
#|2016-01-04| 20| 1|
#|2016-01-05| 0| 1|
#|2016-01-06| 30| 1|
#|2016-01-07| 20| 1|
#|2016-01-08| 0| 1|
#+----------+--------+---+
Run Code Online (Sandbox Code Playgroud)
Update:
df=spark.createDataFrame([(1,'2016-01-03',10),(1,'2016-01-04',20),(1,'2016-01-06',30),(1,'2016-01-07',20),(2,'2016-01-02',10),(2,'2016-01-03',10),(2,'2016-01-04',20),(2,'2016-01-06',20),(2,'2016-01-07',20)],["id","date","quantity"])
df1=df.selectExpr("id").distinct().selectExpr("id","explode(sequence(date('2016-01-01'),date('2016-01-08'),INTERVAL 1 DAY)) as date").withColumn("quantity",lit(0))
from pyspark.sql.functions import *
from pyspark.sql.types import *
exprs=[coalesce(col('df.'f'{f}'),col('df1.'f'{f}')).alias(f) for f in df1.columns]
df2=df1.alias("df1").join(df.alias("df"),(col("df1.date") == col("df.date"))& (col("df1.id") == col("df.id")),'left').select(*exprs)
df2.orderBy("id","date").show()
#+---+----------+--------+
#| id| date|quantity|
#+---+----------+--------+
#| 1|2016-01-01| 0|
#| 1|2016-01-02| 0|
#| 1|2016-01-03| 10|
#| 1|2016-01-04| 20|
#| 1|2016-01-05| 0|
#| 1|2016-01-06| 30|
#| 1|2016-01-07| 20|
#| 1|2016-01-08| 0|
#| 2|2016-01-01| 0|
#| 2|2016-01-02| 10|
#| 2|2016-01-03| 10|
#| 2|2016-01-04| 20|
#| 2|2016-01-05| 0|
#| 2|2016-01-06| 20|
#| 2|2016-01-07| 20|
#| 2|2016-01-08| 0|
#+---+----------+--------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2088 次 |
| 最近记录: |