有没有办法使用 pyspark.sql.functions.date_add 和 col('column_name') 作为第二个参数而不是静态整数?

Mab*_*loq 5 pyspark

在 ETL 过程中,我有一个 5 位整数格式的 SAS 日期字段,表示自 1960 年 1 月 1 日以来的天数。为了使该数据列在分析中更有用,我想将该列转换为 Redshift 中的日期数据类型字段。

目前我正在尝试在 pyspark 中执行此操作,如下所示:

  • 使用字符串文字“1960-01-01”创建了新列“sas_date”

  • 使用 pyspark.sql.function.date_add 我将“sas-date”列作为开始日期参数传递,将整数值“arrival_date”列作为第二个参数传递。

  • 当 date_add 函数运行时,我收到错误 Column not iterable,即使我认为arrival_date 列是一个系列意味着它是可迭代的。但事实并非如此,为什么呢?

  • 当我删除“arrival_date”列并将其替换为静态整数值(例如 1)时,date_add 函数将起作用。

i94 = i94.withColumn('arrival_date', col('arrival_date').cast(Int()))
i94 = i94.withColumn('sas_date', lit("1960-01-01"))
i94 = i94.withColumn('arrival_date', date_add(col('sas_date'), i94['arrival_date']))
Run Code Online (Sandbox Code Playgroud)

我希望能够传递我的列,以便第二个 date_add 参数是动态的。然而 date_add 似乎不接受这个?如果 date_addd 不能完成此任务,除了使用 UDF 之外我还有什么其他选择?

更新: date_add() 操作之前的数据状态

i94.printSchema()
Run Code Online (Sandbox Code Playgroud)

|-- cic_id:双精度(可空 = true)

|-- Visa_id:字符串(可空 = true)

|-- port_id: 字符串 (nullable = true)

|--airline_id: 字符串 (nullable = true)

|-- cit_id:双精度(可空 = true)

|-- res_id:双精度(可空 = true)

|-- 年:双精度(可空 = true)

|-- 月份:双精度(可空 = true)

|-- 年龄:双精度(可空 = true)

|-- 性别:字符串(可空 = true)

|-- 到达日期:整数(可为 null = true)

|-- 出发日期:双精度(可空 = true)

|-- date_begin: 字符串 (nullable = true)

|-- date_end: 字符串 (nullable = true)

|-- sas_date:字符串(可空 = false)

i94.limit(10).toPandas()
Run Code Online (Sandbox Code Playgroud)

toPandas() 结果

SMa*_*MaZ 6

我认为你是绝对正确的,date_add被设计为int仅在Spark <3.0.0之前取值:

在 Spark scala 实现中,我看到以下几行。它表明无论我们将其传递给函数的什么值,date_add它都会再次转换为列lit

火花<3.0.0

def date_add(start: Column, days: Int): Column = date_add(start, lit(days))

火花 >=3.0.0

def date_add(start: Column, days: Column): Column = withExpr { DateAdd(start.expr, days.expr) }

  • 现在让我们谈谈解决方案,我可以想到两种方法:

导入并准备一小部分数据集:

import pyspark.sql.functions as f
import pyspark.sql.types as t
from datetime import datetime
from datetime import timedelta

l1 = [(5748517.0,'1960-01-01', 20574), (5748517.0,'1960-01-01', 20574), (5748517.0,'1960-01-01', 20574)]
df = spark.createDataFrame(l1).toDF('cic_id','sas_date','arrival_date')
df.show()
+---------+----------+------------+
|   cic_id|  sas_date|arrival_date|
+---------+----------+------------+
|5748517.0|1960-01-01|       20574|
|5748517.0|1960-01-01|       20574|
|5748517.0|1960-01-01|       20574|
+---------+----------+------------+
Run Code Online (Sandbox Code Playgroud)

现在,有两种方法可以实现功能。

  1. UDF方式:
def date_add_(date, days):

    # Type check and convert to datetime object
    # Format and other things should be handle more delicately
    if type(date) is not datetime:
        date = datetime.strptime('1960-01-01', "%Y-%m-%d")
    return date + timedelta(days)


date_add_udf = f.udf(date_add_, t.DateType())

df.withColumn('actual_arrival_date', date_add_udf(f.to_date('sas_date'), 'arrival_date')).show()
+---------+----------+------------+-------------------+
|   cic_id|  sas_date|arrival_date|actual_arrival_date|
+---------+----------+------------+-------------------+
|5748517.0|1960-01-01|       20574|         2016-04-30|
|5748517.0|1960-01-01|       20574|         2016-04-30|
|5748517.0|1960-01-01|       20574|         2016-04-30|
+---------+----------+------------+-------------------+

Run Code Online (Sandbox Code Playgroud)
  1. 使用expr评价:
df.withColumn('new_arrival_date', f.expr("date_add(sas_date, arrival_date)")).show()
+---------+----------+------------+----------------+
|   cic_id|  sas_date|arrival_date|new_arrival_date|
+---------+----------+------------+----------------+
|5748517.0|1960-01-01|       20574|      2016-04-30|
|5748517.0|1960-01-01|       20574|      2016-04-30|
|5748517.0|1960-01-01|       20574|      2016-04-30|
+---------+----------+------------+----------------+
Run Code Online (Sandbox Code Playgroud)