Use*_*345 5 pandas apache-spark pyspark
我有一个pandas
数据框pyspark
.我想将这个数据框创建/加载到hive
表中.
pd_df = pandas data frame
id int64
TEST_TIME datetime64[ns]
status_time object
GROUP object
test_type object
dtype: object
id TEST_TIME status_time GROUP test_type
0 1 2017-03-12 02:19:51 Driver started
1 2 2017-03-12 02:19:53 2017-03-11 18:13:43.577 ALARM AL_PT2334_L
2 3 2017-03-12 02:19:53 2017-03-11 18:13:43.577 ALARM AL_Turb_CNet_Ch_A_Fault
3 4 2017-03-12 02:19:53 2017-03-11 18:13:43.577 ALARM AL_Encl_Fire_Sys_Trouble
4 5 2017-03-12 02:19:54 2017-03-11 18:13:44.611 STATUS ST_Engine_Turning_Mode
Run Code Online (Sandbox Code Playgroud)
现在我将pandas
数据帧转换为火花数据帧,如下所示.
spark_df = sqlContext.createDataFrame(pd_df)
+---+-------------------+--------------------+------+--------------------+
| id| TEST_TIME| status_time| GROUP| test_type|
+---+-------------------+--------------------+------+--------------------+
| 1|1489285191000000000| | | Driver started|
| 2|1489285193000000000|2017-03-11 18:13:...| ALARM| AL_PT2334_L|
| 3|1489285193000000000|2017-03-11 18:13:...| ALARM|AL_Turb_CNet_Ch_A...|
| 4|1489285193000000000|2017-03-11 18:13:...| ALARM|AL_Encl_Fire_Sys_...|
| 5|1489285194000000000|2017-03-11 18:13:...|STATUS|ST_Engine_Turning...|
+---+-------------------+--------------------+------+--------------------+
DataFrame[id: bigint, TEST_TIME: bigint, status_time: string, GROUP: string, test_type: string]
Run Code Online (Sandbox Code Playgroud)
我希望该TEST_TIME
列是时间戳列,但我得到了bigint
.
我希望即使在timestamp
中pd_df
也是如此spark_df
.
在将pandas
数据spark
帧转换为数据帧时,我已经完成了以下操作
spark_df = sqlContext.createDataFrame(pd_df).withColumn("TEST_TIME", (F.unix_timestamp("TEST_TIME") + 28800).cast('timestamp'))
Run Code Online (Sandbox Code Playgroud)
我得到了以下错误
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/lib/spark/python/pyspark/sql/dataframe.py", line 1314, in withColumn
return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
File "/opt/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/opt/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/lib/spark/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve 'unixtimestamp(TEST_TIME,yyyy-MM-dd HH:mm:ss)' due to data type mismatch: argument 1 requires (string or date or timestamp) type, however, 'TEST_TIME' is of bigint type.;"
Run Code Online (Sandbox Code Playgroud)
我怎样才能实现我想要的目标
将datetime64类型的pandas dataframe列转换为python datetime对象,如下所示:
pd_df['TEST_TIME'] = pandas.Series(pd_df['TEST_TIME'].dt.to_pydatetime(), dtype=object)
然后像您一样创建spark数据框。
归档时间: |
|
查看次数: |
2899 次 |
最近记录: |