Spark Sql:TypeError("StructType不能接受类型%s中的对象"%type(obj))

Thi*_*Eye 8 python apache-spark apache-spark-sql spark-dataframe

我目前正在使用PyODBC从SQL Server中提取数据并尝试以近实时(NRT)方式插入Hive中的表.

我从源代码获得了一行并转换为List [Strings]并以编程方式创建了schema,但在创建DataFrame时,Spark抛出了StructType错误.

>>> cnxn = pyodbc.connect(con_string)
>>> aj = cnxn.cursor()
>>>
>>> aj.execute("select * from tjob")
<pyodbc.Cursor object at 0x257b2d0>

>>> row = aj.fetchone()

>>> row
(1127, u'', u'8196660', u'', u'', 0, u'', u'', None, 35, None, 0, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, u'', 0, None, None)
>>> rowstr = map(str,row)
>>> rowstr
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', '', '0', 'None', 'None']

>>> schemaString = " ".join([row.column_name for row in aj.columns(table='tjob')])

>>> schemaString
u'ID ExternalID Name Description Notes Type Lot SubLot ParentJobID ProductID PlannedStartDateTime PlannedDurationSeconds Capture01 Capture02 Capture03 Capture04 Capture05 Capture06 Capture07 Capture08 Capture09 Capture10 Capture11 Capture12 Capture13 Capture14 Capture15 Capture16 Capture17 Capture18 Capture19 Capture20 User UserState ModifiedDateTime UploadedDateTime'

>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> schema = StructType(fields)

>>> [f.dataType for f in schema.fields]
[StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType]

>>> myrdd = sc.parallelize(rowstr)

>>> myrdd.collect()
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', '', '0', 'None', 'None']

>>> schemaPeople = sqlContext.createDataFrame(myrdd, schema)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame
    rdd, schema = self._createFromRDD(data, schema, samplingRatio)
  File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
    _verify_type(row, schema)
  File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/types.py", line 1132, in _verify_type
    raise TypeError("StructType can not accept object in type %s" % type(obj))
TypeError: StructType can not accept object in type <type 'str'>
Run Code Online (Sandbox Code Playgroud)

Shy*_*nki 12

这是错误消息的原因:

>>> rowstr
['1127', '', '8196660', '', '', '0', '', '', 'None' ... ]   
#rowstr is a list of str

>>> myrdd = sc.parallelize(rowstr)
#myrdd is a rdd of str

>>> schema = StructType(fields)
#schema is StructType([StringType, StringType, ....])

>>> schemaPeople = sqlContext.createDataFrame(myrdd, schema)
#myrdd should have been RDD([StringType, StringType,...]) but is RDD(str)
Run Code Online (Sandbox Code Playgroud)

解决这个问题,使RDD成为合适的类型:

>>> myrdd = sc.parallelize([rowstr])
Run Code Online (Sandbox Code Playgroud)

  • @adelinor:当您处理单例(单元素元组)时,逗号尤其重要;创建单元素元组需要尾随逗号。 (4认同)
  • 这是一个非常有帮助的答案。我花了一个多小时陷入误导性错误 TypeError: StructType can not Accept object 123 in type &lt;class 'int'&gt; 。感谢这篇文章,我明白我需要使用表达式spark.createDataFrame([(123,)], my_schema)而不是spark.createDataFrame([123], my_schema)创建数据框 (3认同)
  • 至于“适当的类型”是什么?str 的列表?sc.parallelize()? (2认同)