PySpark中具有多列的日期算法

Tur*_*hin 4 python apache-spark pyspark spark-dataframe

我正在尝试使用PySpark数据框中的多个列进行一些中等复杂的日期算术.基本上,我有一个名为column的列number表示created_at我需要过滤的时间戳之后的周数.在PostgreSQL中,你可以根据列中的值乘以一个间隔,但我似乎无法弄清楚如何使用SQL API或Python API在PySpark中执行此操作.这里的任何帮助将不胜感激!

import datetime
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark import SparkContext

sc = SparkContext()
sqlContext = SQLContext(sc)
start_date = datetime.date(2020,1,1)

my_df = sc.parallelize([
        Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=1,  metric=10),
        Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=2,  metric=10),
        Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=3,  metric=10),
        Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=1,  metric=20),
        Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=2,  metric=20),
        Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=7,  metric=30),
        Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=8,  metric=30),
        Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=9,  metric=30),
        Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=10, metric=30),
    ]).toDF()


# This doesn't work!
new_df = my_df.where("created_at + interval 7 days * number > '" + start_date.strftime("%Y-%m-%d") +"'")
# Neither does this!
new_df = my_df.filter(my_df.created_at + datetime.timedelta(days=my_df.number * 7)).date() > start_date.date()
Run Code Online (Sandbox Code Playgroud)

这里有一个可能的解决方案,需要将日期转换为字符串,使用datetimepython中的库将字符串转换为datetime对象,然后执行操作,但这看起来很疯狂.

Tur*_*hin 6

好吧,我想出了使用expr内置date_add函数的前进方法.

from pyspark.sql.functions import expr, date_add
new_df = my_df.withColumn('test', expr('date_add(created_at, number*7)'))
filtered = new_df.filter(new_df.test > start_date)
filtered.show()
Run Code Online (Sandbox Code Playgroud)

但是,如果其他人想要添加,我会非常喜欢一般性地了解其工作方式/原因.