Sha*_*kar 7 apache-spark-sql pyspark
我有一个DF,我有bookingDt和arrivalDt列.我需要找到这两个日期之间的所有日期.
示例代码:
df = spark.sparkContext.parallelize(
[Row(vyge_id=1000, bookingDt='2018-01-01', arrivalDt='2018-01-05')]).toDF()
diffDaysDF = df.withColumn("diffDays", datediff('arrivalDt', 'bookingDt'))
diffDaysDF.show()
Run Code Online (Sandbox Code Playgroud)
代码输出:
+----------+----------+-------+--------+
| arrivalDt| bookingDt|vyge_id|diffDays|
+----------+----------+-------+--------+
|2018-01-05|2018-01-01| 1000| 4|
+----------+----------+-------+--------+
Run Code Online (Sandbox Code Playgroud)
我尝试的是找到两个日期之间的天数,并使用timedelta函数计算所有日期explode.
dateList = [str(bookingDt + timedelta(i)) for i in range(diffDays)]
Run Code Online (Sandbox Code Playgroud)
预期产量:
基本上,我需要建立一个DF与对之间的每个日的记录bookingDt和arrivalDt,包容性.
+----------+----------+-------+----------+
| arrivalDt| bookingDt|vyge_id|txnDt |
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-01|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-02|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-03|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-04|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-05|
+----------+----------+-------+----------+
Run Code Online (Sandbox Code Playgroud)
wer*_*ner 11
对于Spark 2.4+ 序列,可用于创建包含bookingDt和之间所有日期的数组arrivalDt。然后可以分解该阵列。
from pyspark.sql import functions as F
df = df \
.withColumn('bookingDt', F.col('bookingDt').cast('date')) \
.withColumn('arrivalDt', F.col('arrivalDt').cast('date'))
df.withColumn('txnDt', F.explode(F.expr('sequence(bookingDt, arrivalDt, interval 1 day)')))\
.show()
Run Code Online (Sandbox Code Playgroud)
输出:
+-------+----------+----------+----------+
|vyge_id| bookingDt| arrivalDt| txnDt|
+-------+----------+----------+----------+
| 1000|2018-01-01|2018-01-05|2018-01-01|
| 1000|2018-01-01|2018-01-05|2018-01-02|
| 1000|2018-01-01|2018-01-05|2018-01-03|
| 1000|2018-01-01|2018-01-05|2018-01-04|
| 1000|2018-01-01|2018-01-05|2018-01-05|
+-------+----------+----------+----------+
Run Code Online (Sandbox Code Playgroud)
只要你使用的Spark 2.1或更高版本,你可以利用这一事实,我们可以使用的列值作为参数使用时pyspark.sql.functions.expr():
diffDays','以将其转换为大小数组diffDays pyspark.sql.functions.posexplode()与它一起指数爆炸这个数组pyspark.sql.functions.date_add()添加索引值的天数bookingDt码:
import pyspark.sql.functions as f
diffDaysDF.withColumn("repeat", f.expr("split(repeat(',', diffDays), ',')"))\
.select("*", f.posexplode("repeat").alias("txnDt", "val"))\
.drop("repeat", "val", "diffDays")\
.withColumn("txnDt", f.expr("date_add(bookingDt, txnDt)"))\
.show()
#+----------+----------+-------+----------+
#| arrivalDt| bookingDt|vyge_id| txnDt|
#+----------+----------+-------+----------+
#|2018-01-05|2018-01-01| 1000|2018-01-01|
#|2018-01-05|2018-01-01| 1000|2018-01-02|
#|2018-01-05|2018-01-01| 1000|2018-01-03|
#|2018-01-05|2018-01-01| 1000|2018-01-04|
#|2018-01-05|2018-01-01| 1000|2018-01-05|
#+----------+----------+-------+----------+
Run Code Online (Sandbox Code Playgroud)
好吧,您可以执行以下操作。
创建仅包含日期的数据框:
dates_df#与第一天到bookingDt最后天之间的所有天数arrivalDt
然后在以下条件之间加入这些df:
df.join(dates_df,
on=col('dates_df.dates').between(col('df.bookindDt'), col('dt.arrivalDt'))
.select('df.*', 'dates_df.dates')
Run Code Online (Sandbox Code Playgroud)
它可能比使用解决方案更快explode,但是您需要确定此df的开始日期和结束日期。
10年df仅有3650条记录,不必担心太多。
| 归档时间: |
|
| 查看次数: |
6595 次 |
| 最近记录: |