填写spark数据框列中缺少的日期

Ish*_*han 2 datetime scala apache-spark apache-spark-sql

我有一个带有列的火花数据框 - 类型的"日期"和类型的timestamp"数量" long.对于每个日期,我都有一些数量的价值.日期按递增顺序排序.但是有一些日期缺失了.例如 - 当前df -

Date        |    Quantity
10-09-2016  |    1
11-09-2016  |    2
14-09-2016  |    0
16-09-2016  |    1
17-09-2016  |    0
20-09-2016  |    2
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,df有一些缺失的日期,如12-09-2016,13-​​09-2016等.我想在数量字段中为那些缺少的日期添加0,这样得到的df应该看起来像 -

Date        |    Quantity
10-09-2016  |    1
11-09-2016  |    2
12-09-2016  |    0
13-09-2016  |    0
14-09-2016  |    0
15-09-2016  |    0
16-09-2016  |    1
17-09-2016  |    0
18-09-2016  |    0
19-09-2016  |    0
20-09-2016  |    2
Run Code Online (Sandbox Code Playgroud)

任何有关此的帮助/建议将不胜感激.提前致谢.请注意,我在scala编码.

Fer*_*rgo 10

基于@mrsrinivas 优秀的答案,这里是 PySpark 版本。

需要进口

from typing import List
import datetime
from pyspark.sql import DataFrame, Window
from pyspark.sql.functions import col, lit, udf, datediff, lead, explode
from pyspark.sql.types import DateType, ArrayType
Run Code Online (Sandbox Code Playgroud)

UDF 用于创建下一个日期范围

def _get_next_dates(start_date: datetime.date, diff: int) -> List[datetime.date]:
    return [start_date + datetime.timedelta(days=days) for days in range(1, diff)]
Run Code Online (Sandbox Code Playgroud)

函数创建填充日期的 DateFrame(支持“分组”列):

def _get_fill_dates_df(df: DataFrame, date_column: str, group_columns: List[str], fill_column: str) -> DataFrame:
    get_next_dates_udf = udf(_get_next_dates, ArrayType(DateType()))

    window = Window.orderBy(*group_columns, date_column)

    return df.withColumn("_diff", datediff(lead(date_column, 1).over(window), date_column)) \
        .filter(col("_diff") > 1).withColumn("_next_dates", get_next_dates_udf(date_column, "_diff")) \
        .withColumn(fill_column, lit("0")).withColumn(date_column, explode("_next_dates")) \
        .drop("_diff", "_next_dates")
Run Code Online (Sandbox Code Playgroud)

函数的使用:

fill_df = _get_fill_dates_df(df, "Date", [], "Quantity")
df = df.union(fill_df)
Run Code Online (Sandbox Code Playgroud)

它假定日期列已经是日期类型。


mrs*_*vas 9

为了便于理解代码,我已经用冗长的方式编写了这个答案.它可以进行优化.

需要进口

import java.time.format.DateTimeFormatter
import java.time.{LocalDate, LocalDateTime}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, TimestampType}
Run Code Online (Sandbox Code Playgroud)

String为有效日期格式的UDF

 val date_transform = udf((date: String) => {
    val dtFormatter = DateTimeFormatter.ofPattern("d-M-y")
    val dt = LocalDate.parse(date, dtFormatter)
    "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
      .replaceAll(" ", "0")
  })
Run Code Online (Sandbox Code Playgroud)

以下UDF代码取自Iterate的日期范围

  def fill_dates = udf((start: String, excludedDiff: Int) => {
    val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
    val fromDt = LocalDateTime.parse(start, dtFormatter)
    (1 to (excludedDiff - 1)).map(day => {
      val dt = fromDt.plusDays(day)
      "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
        .replaceAll(" ", "0")
    })
  })
Run Code Online (Sandbox Code Playgroud)

设置示例数据帧(df)

val df = Seq(
      ("10-09-2016", 1),
      ("11-09-2016", 2),
      ("14-09-2016", 0),
      ("16-09-2016", 1),
      ("17-09-2016", 0),
      ("20-09-2016", 2)).toDF("date", "quantity")
      .withColumn("date", date_transform($"date").cast(TimestampType))
      .withColumn("quantity", $"quantity".cast(LongType))

df.printSchema()
root
 |-- date: timestamp (nullable = true)
 |-- quantity: long (nullable = false)


df.show()    
+-------------------+--------+
|               date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00|       1|
|2016-09-11 00:00:00|       2|
|2016-09-14 00:00:00|       0|
|2016-09-16 00:00:00|       1|
|2016-09-17 00:00:00|       0|
|2016-09-20 00:00:00|       2|
+-------------------+--------+
Run Code Online (Sandbox Code Playgroud)

创建一个临时数据框(tempDf)来uniondf:

val w = Window.orderBy($"date")
val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date"))
  .filter($"diff" > 1) // Pick date diff more than one day to generate our date
  .withColumn("next_dates", fill_dates($"date", $"diff"))
  .withColumn("quantity", lit("0"))
  .withColumn("date", explode($"next_dates"))
  .withColumn("date", $"date".cast(TimestampType))

tempDf.show(false)
+-------------------+--------+----+------------------------+
|date               |quantity|diff|next_dates              |
+-------------------+--------+----+------------------------+
|2016-09-12 00:00:00|0       |3   |[2016-09-12, 2016-09-13]|
|2016-09-13 00:00:00|0       |3   |[2016-09-12, 2016-09-13]|
|2016-09-15 00:00:00|0       |2   |[2016-09-15]            |
|2016-09-18 00:00:00|0       |3   |[2016-09-18, 2016-09-19]|
|2016-09-19 00:00:00|0       |3   |[2016-09-18, 2016-09-19]|
+-------------------+--------+----+------------------------+
Run Code Online (Sandbox Code Playgroud)

现在结合两个数据帧

val result = df.union(tempDf.select("date", "quantity"))
  .orderBy("date")

result.show()
+-------------------+--------+
|               date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00|       1|
|2016-09-11 00:00:00|       2|
|2016-09-12 00:00:00|       0|
|2016-09-13 00:00:00|       0|
|2016-09-14 00:00:00|       0|
|2016-09-15 00:00:00|       0|
|2016-09-16 00:00:00|       1|
|2016-09-17 00:00:00|       0|
|2016-09-18 00:00:00|       0|
|2016-09-19 00:00:00|       0|
|2016-09-20 00:00:00|       2|
+-------------------+--------+
Run Code Online (Sandbox Code Playgroud)