Spark 在 Pandas 数据帧转换期间如何处理时间戳类型?

tan*_*ngy 5 python datetime numpy apache-spark pyspark

我有一个 pandas 数据框,其中包含 pandas.tslib.Timestamp 类型的时间戳列。我查看了“createDataFrame”(链接到源)中的 pyspark 源代码,似乎它们将数据转换为 numpy 记录数组到列表:

data = [r.tolist() for r in data.to_records(index=False)]
Run Code Online (Sandbox Code Playgroud)

但是,时间戳类型在此过程中转换为长整型列表:

> df = pd.DataFrame(pd.date_range(start=datetime.datetime.now(),periods=5,freq='s'))
> df
0 2017-07-25 11:53:29.353923
1 2017-07-25 11:53:30.353923
2 2017-07-25 11:53:31.353923
3 2017-07-25 11:53:32.353923
4 2017-07-25 11:53:33.353923
> df.to_records(index=False).tolist()
[(1500983799614193000L,), (1500983800614193000L,), (1500983801614193000L,), (1500983802614193000L,), (1500983803614193000L,)]
Run Code Online (Sandbox Code Playgroud)

现在,如果我将这样的列表传递给 RDD,执行一些操作(不触及时间戳列),然后调用

> spark.createDataFrame(rdd,schema) // with schema mentioning that column as TimestampType
TypeError: TimestampType can not accept object 1465197332112000000L in type <type 'long'>
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我应该做什么(在将列表转换为 RDD 之前)来保留日期时间类型。

编辑1

我知道的一些涉及数据帧创建后处理的方法是:

  1. 将时区信息添加到 pandas 中的 datetime 对象。然而,这似乎没有必要,并且可能会导致错误,具体取决于您自己的时区。

  2. 使用日期时间库将 long 转换为时间戳。

假设 tstamp 是输入: tstamp = datetime(1970, 1, 1) + timedelta(microseconds=tstampl/1000)

  1. 在 Pandas 数据帧端将日期时间转换为字符串,然后在 Spark 数据帧端转换为日期时间。

正如下面 Suresh 的回答所解释的

不过,我正在寻找一种更简单的方法,可以在数据帧创建本身之前处理所有处理。

Sur*_*esh 2

我尝试将时间戳列转换为字符串类型,然后在 pandas 系列上应用 tolist() 。使用 Spark 数据框中的列表并转换回时间戳。

>>> df = pd.DataFrame(pd.date_range(start=datetime.datetime.now(),periods=5,freq='s'))
>>> df
                    0
0 2017-07-25 21:51:53.963
1 2017-07-25 21:51:54.963
2 2017-07-25 21:51:55.963
3 2017-07-25 21:51:56.963
4 2017-07-25 21:51:57.963

>>> df1 = df[0].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
>>> type(df1)
<class 'pandas.core.series.Series'>
>>> df1.tolist()
['2017-07-25 21:51:53', '2017-07-25 21:51:54', '2017-07-25 21:51:55', '2017-07-25 21:51:56', '2017-07-25 21:51:57']

 from pyspark.sql.types import StringType,TimestampType
 >>> sdf = spark.createDataFrame(df1.tolist(),StringType())
 >>> sdf.printSchema()
 root
    |-- value: string (nullable = true)
 >>> sdf = sdf.select(sdf['value'].cast('timestamp'))
 >>> sdf.printSchema()
 root
    |-- value: timestamp (nullable = true)

 >>> sdf.show(5,False)
 +---------------------+
 |value                |
 +---------------------+
 |2017-07-25 21:51:53.0|
 |2017-07-25 21:51:54.0|
 |2017-07-25 21:51:55.0|
 |2017-07-25 21:51:56.0|
 |2017-07-25 21:51:57.0|
 +---------------------+
Run Code Online (Sandbox Code Playgroud)