在 ETL 过程中,我有一个 5 位整数格式的 SAS 日期字段,表示自 1960 年 1 月 1 日以来的天数。为了使该数据列在分析中更有用,我想将该列转换为 Redshift 中的日期数据类型字段。
目前我正在尝试在 pyspark 中执行此操作,如下所示:
使用字符串文字“1960-01-01”创建了新列“sas_date”
使用 pyspark.sql.function.date_add 我将“sas-date”列作为开始日期参数传递,将整数值“arrival_date”列作为第二个参数传递。
当 date_add 函数运行时,我收到错误 Column not iterable,即使我认为arrival_date 列是一个系列意味着它是可迭代的。但事实并非如此,为什么呢?
当我删除“arrival_date”列并将其替换为静态整数值(例如 1)时,date_add 函数将起作用。
i94 = i94.withColumn('arrival_date', col('arrival_date').cast(Int()))
i94 = i94.withColumn('sas_date', lit("1960-01-01"))
i94 = i94.withColumn('arrival_date', date_add(col('sas_date'), i94['arrival_date']))
Run Code Online (Sandbox Code Playgroud)
我希望能够传递我的列,以便第二个 date_add 参数是动态的。然而 date_add 似乎不接受这个?如果 date_addd 不能完成此任务,除了使用 UDF 之外我还有什么其他选择?
更新: date_add() 操作之前的数据状态
i94.printSchema()
Run Code Online (Sandbox Code Playgroud)
根
|-- cic_id:双精度(可空 = true)
|-- Visa_id:字符串(可空 = true)
|-- port_id: 字符串 (nullable = true)
|--airline_id: 字符串 (nullable = true)
|-- cit_id:双精度(可空 = true)
|-- res_id:双精度(可空 = true)
|-- 年:双精度(可空 = true)
|-- 月份:双精度(可空 = true)
|-- 年龄:双精度(可空 = true)
|-- 性别:字符串(可空 = true)
|-- 到达日期:整数(可为 null = true)
|-- 出发日期:双精度(可空 = true)
|-- date_begin: 字符串 (nullable = true)
|-- date_end: 字符串 (nullable = true)
|-- sas_date:字符串(可空 = false)
i94.limit(10).toPandas()
Run Code Online (Sandbox Code Playgroud)
我认为你是绝对正确的,date_add被设计为int仅在Spark <3.0.0之前取值:
在 Spark scala 实现中,我看到以下几行。它表明无论我们将其传递给函数的什么值,date_add它都会再次转换为列lit
def date_add(start: Column, days: Int): Column = date_add(start, lit(days))
def date_add(start: Column, days: Column): Column = withExpr { DateAdd(start.expr, days.expr) }
导入并准备一小部分数据集:
import pyspark.sql.functions as f
import pyspark.sql.types as t
from datetime import datetime
from datetime import timedelta
l1 = [(5748517.0,'1960-01-01', 20574), (5748517.0,'1960-01-01', 20574), (5748517.0,'1960-01-01', 20574)]
df = spark.createDataFrame(l1).toDF('cic_id','sas_date','arrival_date')
df.show()
+---------+----------+------------+
| cic_id| sas_date|arrival_date|
+---------+----------+------------+
|5748517.0|1960-01-01| 20574|
|5748517.0|1960-01-01| 20574|
|5748517.0|1960-01-01| 20574|
+---------+----------+------------+
Run Code Online (Sandbox Code Playgroud)
现在,有两种方法可以实现功能。
def date_add_(date, days):
# Type check and convert to datetime object
# Format and other things should be handle more delicately
if type(date) is not datetime:
date = datetime.strptime('1960-01-01', "%Y-%m-%d")
return date + timedelta(days)
date_add_udf = f.udf(date_add_, t.DateType())
df.withColumn('actual_arrival_date', date_add_udf(f.to_date('sas_date'), 'arrival_date')).show()
+---------+----------+------------+-------------------+
| cic_id| sas_date|arrival_date|actual_arrival_date|
+---------+----------+------------+-------------------+
|5748517.0|1960-01-01| 20574| 2016-04-30|
|5748517.0|1960-01-01| 20574| 2016-04-30|
|5748517.0|1960-01-01| 20574| 2016-04-30|
+---------+----------+------------+-------------------+
Run Code Online (Sandbox Code Playgroud)
expr评价:df.withColumn('new_arrival_date', f.expr("date_add(sas_date, arrival_date)")).show()
+---------+----------+------------+----------------+
| cic_id| sas_date|arrival_date|new_arrival_date|
+---------+----------+------------+----------------+
|5748517.0|1960-01-01| 20574| 2016-04-30|
|5748517.0|1960-01-01| 20574| 2016-04-30|
|5748517.0|1960-01-01| 20574| 2016-04-30|
+---------+----------+------------+----------------+
Run Code Online (Sandbox Code Playgroud)