获取Spark DataFrame中两个日期之间的所有日期

Sha*_*kar 7 apache-spark-sql pyspark

我有一个DF,我有bookingDtarrivalDt列.我需要找到这两个日期之间的所有日期.

示例代码:

df = spark.sparkContext.parallelize(
            [Row(vyge_id=1000, bookingDt='2018-01-01', arrivalDt='2018-01-05')]).toDF()
diffDaysDF = df.withColumn("diffDays", datediff('arrivalDt', 'bookingDt'))
diffDaysDF.show()
Run Code Online (Sandbox Code Playgroud)

代码输出:

+----------+----------+-------+--------+
| arrivalDt| bookingDt|vyge_id|diffDays|
+----------+----------+-------+--------+
|2018-01-05|2018-01-01|   1000|       4|
+----------+----------+-------+--------+
Run Code Online (Sandbox Code Playgroud)

我尝试的是找到两个日期之间的天数,并使用timedelta函数计算所有日期explode.

dateList = [str(bookingDt + timedelta(i)) for i in range(diffDays)]
Run Code Online (Sandbox Code Playgroud)

预期产量:

基本上,我需要建立一个DF与对之间的每个日的记录bookingDtarrivalDt,包容性.

+----------+----------+-------+----------+
| arrivalDt| bookingDt|vyge_id|txnDt     |
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-01|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-02|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-03|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-04|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-05|
+----------+----------+-------+----------+
Run Code Online (Sandbox Code Playgroud)

wer*_*ner 11

对于Spark 2.4+ 序列,可用于创建包含bookingDt和之间所有日期的数组arrivalDt。然后可以分解该阵列。

from pyspark.sql import functions as F

df = df \
  .withColumn('bookingDt', F.col('bookingDt').cast('date')) \
  .withColumn('arrivalDt', F.col('arrivalDt').cast('date'))

df.withColumn('txnDt', F.explode(F.expr('sequence(bookingDt, arrivalDt, interval 1 day)')))\
  .show()
Run Code Online (Sandbox Code Playgroud)

输出:

+-------+----------+----------+----------+
|vyge_id| bookingDt| arrivalDt|     txnDt|
+-------+----------+----------+----------+
|   1000|2018-01-01|2018-01-05|2018-01-01|
|   1000|2018-01-01|2018-01-05|2018-01-02|
|   1000|2018-01-01|2018-01-05|2018-01-03|
|   1000|2018-01-01|2018-01-05|2018-01-04|
|   1000|2018-01-01|2018-01-05|2018-01-05|
+-------+----------+----------+----------+
Run Code Online (Sandbox Code Playgroud)


pau*_*ult 8

只要你使用的Spark 2.1或更高版本,你可以利用这一事实,我们可以使用的列值作为参数使用时pyspark.sql.functions.expr():

码:

import pyspark.sql.functions as f

diffDaysDF.withColumn("repeat", f.expr("split(repeat(',', diffDays), ',')"))\
    .select("*", f.posexplode("repeat").alias("txnDt", "val"))\
    .drop("repeat", "val", "diffDays")\
    .withColumn("txnDt", f.expr("date_add(bookingDt, txnDt)"))\
    .show()
#+----------+----------+-------+----------+
#| arrivalDt| bookingDt|vyge_id|     txnDt|
#+----------+----------+-------+----------+
#|2018-01-05|2018-01-01|   1000|2018-01-01|
#|2018-01-05|2018-01-01|   1000|2018-01-02|
#|2018-01-05|2018-01-01|   1000|2018-01-03|
#|2018-01-05|2018-01-01|   1000|2018-01-04|
#|2018-01-05|2018-01-01|   1000|2018-01-05|
#+----------+----------+-------+----------+
Run Code Online (Sandbox Code Playgroud)


vvg*_*vvg 5

好吧,您可以执行以下操作。

创建仅包含日期的数据框:

dates_df#与第一天到bookingDt最后天之间的所有天数arrivalDt

然后在以下条件之间加入这些df:

df.join(dates_df, 
  on=col('dates_df.dates').between(col('df.bookindDt'), col('dt.arrivalDt'))
.select('df.*', 'dates_df.dates')
Run Code Online (Sandbox Code Playgroud)

它可能比使用解决方案更快explode,但是您需要确定此df的开始日期和结束日期。 10年df仅有3650条记录,不必担心太多。