PySpark:在日期为字符串的范围内按日期字段过滤DataFrame

mar*_*tin 10 python datetime date dataframe pyspark

我的数据帧包含一个日期字段,它以字符串格式显示,例如

'2015-07-02T11:22:21.050Z'
Run Code Online (Sandbox Code Playgroud)

我需要在日期过滤DataFrame以仅获取上周的记录.所以,我正在尝试使用strptime将字符串日期转换为datetime对象的地图方法:

def map_to_datetime(row):
     format_string = '%Y-%m-%dT%H:%M:%S.%fZ'
     row.date = datetime.strptime(row.date, format_string)

df = df.map(map_to_datetime)
Run Code Online (Sandbox Code Playgroud)

然后我会应用过滤器作为

df.filter(lambda row:
    row.date >= (datetime.today() - timedelta(days=7)))
Run Code Online (Sandbox Code Playgroud)

我设法让映射工作,但过滤器失败了

TypeError:condition应该是string或Column

有没有办法以一种有效的方式使用过滤,或者我应该改变方法以及如何?

mar*_*tin 9

我找到了一种方法来解决我的问题,使用SparkSQL API,日期保持为字符串并执行此操作:

last_week = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d')

new_df = df.where(df.date >= last_week)
Run Code Online (Sandbox Code Playgroud)


zer*_*323 8

您可以在不使用工作方Python代码并切换到RDD的情况下解决此问题.首先,由于您使用ISO 8601字符串,您的数据可以直接转换为日期或时间戳:

from pyspark.sql.functions import col

df = sc.parallelize([
    ('2015-07-02T11:22:21.050Z', ),
    ('2016-03-20T21:00:00.000Z', )
]).toDF(("d_str", ))

df_casted = df.select("*",
    col("d_str").cast("date").alias("dt"), 
    col("d_str").cast("timestamp").alias("ts"))
Run Code Online (Sandbox Code Playgroud)

这将节省JVM和Python之间的一次往返.还有一些方法可以接近第二部分.仅限日期:

from pyspark.sql.functions import current_date, datediff, unix_timestamp

df_casted.where(datediff(current_date(), col("dt")) < 7)
Run Code Online (Sandbox Code Playgroud)

时间戳:

def days(i: int) -> int:
    return 60 * 60 * 24 * i

df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7))
Run Code Online (Sandbox Code Playgroud)

你也可以看看current_timestampdate_sub

注意:我会避免使用DataFrame.map.最好使用它DataFrame.rdd.map.当切换到2.0+时,它将为您节省一些工作