Fis*_*ane 13 dataframe pandas apache-spark apache-spark-sql pyspark
我有csv数据并使用read_csv创建Pandas数据帧并将所有列强制为字符串.然后,当我尝试从Pandas数据帧创建Spark数据帧时,我收到以下错误消息.
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
z=pd.read_csv("mydata.csv", dtype=str)
z.info()
Run Code Online (Sandbox Code Playgroud)
<class 'pandas.core.frame.DataFrame'>
Int64Index: 74044003 entries, 0 to 74044002
Data columns (total 12 columns):
primaryid object
event_dt object
age object
age_cod object
age_grp object
sex object
occr_country object
drug_seq object
drugname object
route object
outc_cod object
pt object
Run Code Online (Sandbox Code Playgroud)
q= sqlContext.createDataFrame(z)
Run Code Online (Sandbox Code Playgroud)
File "<stdin>", line 1, in <module>
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 425, in createDataFrame
rdd, schema = self._createFromLocal(data, schema)
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 341, in _createFromLocal
struct = self._inferSchemaFromList(data)
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 241, in _inferSchemaFromList
schema = reduce(_merge_type, map(_infer_schema, data))
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 862, in _merge_type
for f in a.fields]
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 856, in _merge_type
raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
TypeError: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>
Run Code Online (Sandbox Code Playgroud)
这是一个例子.我正在下载公共数据并创建pandas数据帧,但spark不会从pandas数据帧创建spark数据帧.
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
url ="http://www.nber.org/fda/faers/2016/demo2016q1.csv.zip"
import requests, zipfile, StringIO
r = requests.get(url, stream=True)
z = zipfile.ZipFile(StringIO.StringIO(r.content))
z.extractall()
z=pd.read_csv("demo2016q1.csv") # creates pandas dataframe
Data_Frame = sqlContext.createDataFrame(z)
Run Code Online (Sandbox Code Playgroud)
zer*_*323 17
长话短说不依赖于模式推理.一般而言,它既昂贵又棘手.特别是数据中的某些列(例如event_dt_num)缺少值,这会推动Pandas将它们表示为混合类型(不丢失的字符串,缺失值的NaN).
如果您有疑问,最好将所有数据作为字符串读取并在之后进行转换.如果您可以访问代码簿,则应始终提供架构以避免出现问题并降低总体成本.
最后从驱动程序传递数据是反模式.您应该能够使用csv格式(Spark 2.0.0+)或spark-csv库(Spark 1.6及更低版本)直接读取此数据:
df = (spark.read.format("csv").options(header="true")
.load("/path/tp/demo2016q1.csv"))
## root
## |-- primaryid: string (nullable = true)
## |-- caseid: string (nullable = true)
## |-- caseversion: string (nullable = true)
## |-- i_f_code: string (nullable = true)
## |-- i_f_code_num: string (nullable = true)
## ...
## |-- to_mfr: string (nullable = true)
## |-- occp_cod: string (nullable = true)
## |-- reporter_country: string (nullable = true)
## |-- occr_country: string (nullable = true)
## |-- occp_cod_num: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
在这种特殊情况下,添加inferSchema="true"选项也应该起作用,但最好还是避免它.您还可以提供如下架构:
from pyspark.sql.types import StructType
schema = StructType.fromJson({'fields': [{'metadata': {},
'name': 'primaryid',
'nullable': True,
'type': 'integer'},
{'metadata': {}, 'name': 'caseid', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'caseversion', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'i_f_code', 'nullable': True, 'type': 'string'},
{'metadata': {},
'name': 'i_f_code_num',
'nullable': True,
'type': 'integer'},
{'metadata': {}, 'name': 'event_dt', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'event_dt_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'mfr_dt', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'mfr_dt_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'init_fda_dt', 'nullable': True, 'type': 'integer'},
{'metadata': {},
'name': 'init_fda_dt_num',
'nullable': True,
'type': 'string'},
{'metadata': {}, 'name': 'fda_dt', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'fda_dt_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'rept_cod', 'nullable': True, 'type': 'string'},
{'metadata': {},
'name': 'rept_cod_num',
'nullable': True,
'type': 'integer'},
{'metadata': {}, 'name': 'auth_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'mfr_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'mfr_sndr', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'lit_ref', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'age', 'nullable': True, 'type': 'double'},
{'metadata': {}, 'name': 'age_cod', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'age_grp', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'age_grp_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'sex', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'e_sub', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'wt', 'nullable': True, 'type': 'double'},
{'metadata': {}, 'name': 'wt_cod', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'rept_dt', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'rept_dt_num', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'to_mfr', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'occp_cod', 'nullable': True, 'type': 'string'},
{'metadata': {},
'name': 'reporter_country',
'nullable': True,
'type': 'string'},
{'metadata': {}, 'name': 'occr_country', 'nullable': True, 'type': 'string'},
{'metadata': {},
'name': 'occp_cod_num',
'nullable': True,
'type': 'integer'}],
'type': 'struct'})
Run Code Online (Sandbox Code Playgroud)
直接给读者:
(spark.read.schema(schema).format("csv").options(header="true")
.load("/path/to/demo2016q1.csv"))
Run Code Online (Sandbox Code Playgroud)