use*_*217 5 apache-spark apache-spark-sql pyspark pyspark-sql
我有一个非常奇怪的错误,火花数据帧导致一个字符串被评估为时间戳.
这是我的设置代码:
from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
new_schema = StructType([StructField("item_id", StringType(), True),
StructField("date", TimestampType(), True),
StructField("description", StringType(), True)
])
df = sqlContext.createDataFrame([Row(description='description', date=datetime.utcnow(), item_id='id_string')], new_schema)
Run Code Online (Sandbox Code Playgroud)
这给了我以下错误:
()----> 1 df = sqlContext.createDataFrame([Row('='hey',date = datetime.utcnow(),item_id ='id_string')],new_schema)中的AttributeError Traceback(最近一次调用)
createDataFrame中的/home/florian/spark/python/pyspark/sql/context.pyc(self,data,schema,samplingRatio,verifySchema)307 Py4JJavaError:... 308""" - > 309返回self.sparkSession.createDataFrame( data,schema,samplingRatio,verifySchema)310 311 @since(1.3)
createDataFrame中的/home/florian/spark/python/pyspark/sql/session.pyc(self,data,schema,samplingRatio,verifySchema)522 rdd,schema = self._createFromRDD(data.map(prepare),schema,samplingRatio)523 else: - > 524 rdd,schema = self._createFromLocal(map(prepare,data),schema)525 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())526 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd .rdd(),schema.json())
/home/florian/spark/python/pyspark/sql/session.pyc in _createFromLocal(self,data,schema)397 398#将python对象转换为sql数据 - > 399 data = [schema.toInternal(row)for row in数据] 400返回self._sc.parallelize(数据),模式401
/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self,obj)574返回元组(f.toInternal(obj.get(n))用于n,f用zip(self.names,self .fields))575 elif isinstance(obj,(tuple,list)): - > 576 return tuple(f.toInternal(v)for f,v in zip(self.fields,obj))577 elif hasattr(obj, " dict "):578 d = obj.字典
/home/florian/spark/python/pyspark/sql/types.pyc in((f,v))574返回元组(f.toInternal(obj.get(n))为n,f为zip(self.names, self.fields))575 elif isinstance(obj,(tuple,list)): - > 576 return tuple(f.toInternal(v)for f,v in zip(self.fields,obj))577 elif hasattr(obj) ," dict "):578 d = obj.字典
/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self,obj)434 435 def toInternal(self,obj): - > 436 return self.dataType.toInternal(obj)437 438 def fromInternal (self,obj):
/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self,dt)188 def toInternal(self,dt):189如果dt不是None: - > 190秒=(calendar.timegm( dt.utctimetuple())if dt.tzinfo 191 else time.mktime(dt.timetuple()))192 return int(seconds*1e6 + dt.microsecond)
AttributeError:'str'对象没有属性'tzinfo'
这看起来好像是一个字符串被传递给TimestampType.toInternal()
真奇怪的是这个数据框会产生同样的错误:
df = sqlContext.createDataFrame([Row(description='hey', date=None, item_id='id_string')], new_schema)
Run Code Online (Sandbox Code Playgroud)
而这个工作:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id='id_string')], new_schema)
Run Code Online (Sandbox Code Playgroud)
这个也适用:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)
Run Code Online (Sandbox Code Playgroud)
对我来说,现在这意味着pyspark以某种方式将"item_id"中的值放入"date"列中,因此会产生此错误.我做错什么了吗?这是数据框架中的错误吗?
info:我正在使用pyspark 2.0.1
编辑:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)
df.first()
Run Code Online (Sandbox Code Playgroud)
行(ITEM_ID = u'java.util.GregorianCalendar [时间= ?, areFieldsSet =假,areAllFieldsSet =假,宽大=真,区= sun.util.calendar.ZoneInfo [ID = "ETC/UTC",偏移= 0, dstSavings = 0,useDaylight =假,过渡= 0,lastRule =空],Firstdayofweek可= 1,minimalDaysInFirstWeek = 1,ERA = ?, YEAR = 2017,月= 1,WEEK_OF_YEAR = ?, WEEK_OF_MONTH = ?, DAY_OF_MONTH = 3,DAY_OF_YEAR =?,DAY_OF_WEEK =?,DAY_OF_WEEK_IN_MONTH =?,AM_PM = 1,HOUR = 3,HOUR_OF_DAY = 15,MINUTE = 19,SECOND = 30,MILLISECOND = 85,ZONE_OFFSET =?,DST_OFFSET =?]',日期=无,描述=无)
小智 6
创建Row对象时,字段按字母顺序排序(http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.Row),所以当你是创建Row(description, date, item_id)对象,它将被命名为 (date, description, item_id).
正如您的模式一样StringType, TimestampType, StringType,在使用此行和模式创建DataFrame时,Spark会将内容映射date到a StringType,descriptiona TimestampType和item_ida中的内容StringType.
将时间戳(datetime格式)传递给a StringType不会导致错误,但TimestampType会将字符串传递给do ,因为它要求提供tzinfo属性,如错误所述,String对象没有该属性.
此外,为您工作的数据帧实际工作的原因None是因为传递给TimestampType您的架构,这是一个可接受的值.
基于以上来自@rafael-zanetti 的答案。您可以执行以下操作来对列进行排序:
new_schema = [StructField("item_id", StringType(), True),
StructField("date", TimestampType(), True),
StructField("description", StringType(), True)]
new_schema = StructType(sorted(new_schema, key=lambda f: f.name))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3389 次 |
| 最近记录: |