通过以字符串格式减去两个日期时间列来计算持续时间

Jas*_*son 28 apache-spark apache-spark-sql pyspark

我有一个Spark Dataframe,其中包含一系列日期:

from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd

rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
                                    ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
                                    ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
                                    ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
                                    ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)

我想做的是duration通过减去EndDateTime和找到StartDateTime.我想我会尝试使用函数执行此操作:

# Function to calculate time delta
def time_delta(y,x): 
    end = pd.to_datetime(y)
    start = pd.to_datetime(x)
    delta = (end-start)
    return delta

# create new RDD and add new column 'Duration' by applying time_delta function
df2 = df.withColumn('Duration', time_delta(df.EndDateTime, df.StartDateTime)) 
Run Code Online (Sandbox Code Playgroud)

然而,这只是给了我:

>>> df2.show()
ID  EndDateTime          StartDateTime        ANI            Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... sip:4534454450 null    
X02 2014-02-13T12:35:... 2014-02-13T12:32:... sip:6413445440 null    
X03 2014-02-13T12:36:... 2014-02-13T12:32:... sip:4534437492 null    
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... sip:6474454453 null    
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... sip:8874458555 null  
Run Code Online (Sandbox Code Playgroud)

我不确定我的方法是否正确.如果没有,我很乐意接受另一种建议的方法来实现这一目标.

ksi*_*ndi 44

从Spark 1.5开始,您可以使用unix_timestamp:

from pyspark.sql import functions as F
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
timeDiff = (F.unix_timestamp('EndDateTime', format=timeFmt)
            - F.unix_timestamp('StartDateTime', format=timeFmt))
df = df.withColumn("Duration", timeDiff)
Run Code Online (Sandbox Code Playgroud)

请注意Java样式时间格式.

>>> df.show()
+---+--------------------+--------------------+--------+
| ID|         EndDateTime|       StartDateTime|Duration|
+---+--------------------+--------------------+--------+
|X01|2014-02-13T12:36:...|2014-02-13T12:31:...|     258|
|X02|2014-02-13T12:35:...|2014-02-13T12:32:...|     204|
|X03|2014-02-13T12:36:...|2014-02-13T12:32:...|     228|
|XO4|2014-02-13T12:37:...|2014-02-13T12:32:...|     269|
|XO5|2014-02-13T12:36:...|2014-02-13T12:33:...|     202|
+---+--------------------+--------------------+--------+
Run Code Online (Sandbox Code Playgroud)

  • 您可以用3600.0除以将其转换为小时数df.withColumn(“ Duration_hours”,df.Duration / 3600.0) (2认同)

Jas*_*son 15

感谢David Griffin.以下是如何执行此操作以供将来参考.

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf

# Build sample data
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
                      ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
                      ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
                      ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
                      ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

# define timedelta function (obtain duration in seconds)
def time_delta(y,x): 
    from datetime import datetime
    end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')
    start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')
    delta = (end-start).total_seconds()
    return delta

# register as a UDF 
f = udf(time_delta, IntegerType())

# Apply function
df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime)) 
Run Code Online (Sandbox Code Playgroud)

应用time_delta()会在几秒钟内给你持续时间:

>>> df2.show()
ID  EndDateTime          StartDateTime        Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... 258     
X02 2014-02-13T12:35:... 2014-02-13T12:32:... 204     
X03 2014-02-13T12:36:... 2014-02-13T12:32:... 228     
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... 268     
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... 202 
Run Code Online (Sandbox Code Playgroud)

  • 此代码不再起作用.持续时间为空.使用zeppelin,火花1.6 (4认同)
  • 请使用(end-start).total_seconds().否则你会得到如下令人讨厌的惊喜:time_delta('2014-02-13T12:36:14.000','2014-02-13T12:36:15.900')返回86398而不是-1.9 (2认同)

j p*_*mar 10

datediff(Column end, Column start)
Run Code Online (Sandbox Code Playgroud)

返回从开始到结束的天数.

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html

  • 我认为这仅适用于日期,不适用于日期时间 (2认同)