将Pandas数据帧转换为Spark数据帧错误

Ива*_*дос 32 python pandas apache-spark spark-dataframe

我正在尝试将Pandas DF转换为Spark.DF头:

10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
Run Code Online (Sandbox Code Playgroud)

码:

dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)
Run Code Online (Sandbox Code Playgroud)

我收到一个错误:

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
Run Code Online (Sandbox Code Playgroud)

Gra*_*non 26

通过强制模式可以避免类型相关的错误,如下所示:

注意:创建了一个文本文件(test.csv),其中包含原始数据(如上所述)和插入的假设列名称("col1","col2",...,"col25").

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

pdDF = pd.read_csv("test.csv")
Run Code Online (Sandbox Code Playgroud)

pandas数据框的内容:

pdDF

col1    col2    col3    col4    col5    col6    col7    col8    col9    col10   ... col16   col17   col18   col19   col20   col21   col22   col23   col24   col25
0   10000001    1   0   1   12:35   OK  10002   1   0   9   ... 3   9   0   0   1   1   0   0   4   543
1   10000001    2   0   1   12:36   OK  10002   1   0   9   ... 3   9   2   1   1   3   1   3   2   611
2   10000002    1   0   4   12:19   PA  10003   1   1   7   ... 2   15  2   0   2   3   1   2   2   691
Run Code Online (Sandbox Code Playgroud)

接下来,创建架构:

from pyspark.sql.types import *

mySchema = StructType([ StructField("Col1", LongType(), True)\
                       ,StructField("Col2", IntegerType(), True)\
                       ,StructField("Col3", IntegerType(), True)\
                       ,StructField("Col4", IntegerType(), True)\
                       ,StructField("Col5", StringType(), True)\
                       ,StructField("Col6", StringType(), True)\
                       ,StructField("Col7", IntegerType(), True)\
                       ,StructField("Col8", IntegerType(), True)\
                       ,StructField("Col9", IntegerType(), True)\
                       ,StructField("Col10", IntegerType(), True)\
                       ,StructField("Col11", StringType(), True)\
                       ,StructField("Col12", StringType(), True)\
                       ,StructField("Col13", IntegerType(), True)\
                       ,StructField("Col14", IntegerType(), True)\
                       ,StructField("Col15", IntegerType(), True)\
                       ,StructField("Col16", IntegerType(), True)\
                       ,StructField("Col17", IntegerType(), True)\
                       ,StructField("Col18", IntegerType(), True)\
                       ,StructField("Col19", IntegerType(), True)\
                       ,StructField("Col20", IntegerType(), True)\
                       ,StructField("Col21", IntegerType(), True)\
                       ,StructField("Col22", IntegerType(), True)\
                       ,StructField("Col23", IntegerType(), True)\
                       ,StructField("Col24", IntegerType(), True)\
                       ,StructField("Col25", IntegerType(), True)])
Run Code Online (Sandbox Code Playgroud)

注意 :( True暗示允许为空)

创建pyspark数据帧:

df = spark.createDataFrame(pdDF,schema=mySchema)
Run Code Online (Sandbox Code Playgroud)

确认pandas数据框现在是一个pyspark数据框:

type(df)
Run Code Online (Sandbox Code Playgroud)

输出:

pyspark.sql.dataframe.DataFrame
Run Code Online (Sandbox Code Playgroud)

  • 是否可以将模式创建部分概括为仅将所有列创建为某种类型?例如,只需告诉它所有列都为 StringType(而不是单独分配每一列) (2认同)

mad*_*890 24

您需要确保您的pandas数据帧列适用于spark推断的类型.如果您的pandas数据框列出如下内容:

pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol                    5062 non-null object
Col2                       5062 non-null object
Run Code Online (Sandbox Code Playgroud)

你得到的错误尝试:

df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)
Run Code Online (Sandbox Code Playgroud)

现在,确保.astype(str)实际上是您希望这些列的类型.基本上,当底层Java代码试图从python中的对象推断出类型时,它使用一些观察并猜测,如果该猜测不适用于它试图从pandas转换为列的所有数据.火花会失败.


Gon*_*cia 15

我做了这个算法,它适用于我的10个熊猫数据帧

from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return DateType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)


# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)
Run Code Online (Sandbox Code Playgroud)

您也可以在这个要点中看到它

有了这个,你只需要打电话 spark_df = pandas_to_spark(pandas_df)

  • 验证了这一切是否有效,还验证了输出从 pyspark 输出到 parquet 并输入 scala。谢谢贡萨洛。我不知道如何做,但这似乎是对开源社区的杰出贡献。也许像 pd.to_sparkdf() 之类的。 (2认同)
  • Gonzalo,我刚刚分叉了你的要点来支持 ArrayType[StringType]。再次感谢。读者们,这是从 pandas 到 pyspark 和 scala Spark 的绝佳解决方案。 (2认同)
  • 警告:如果您尝试转换由日期和时间组成的日期时间对象(例如“pd.to_datetime('2020-01-01 13:45:12')”),则时间信息会随着您的方法而丢失。为了解决这个问题,请将“DateType()”更改为“TimestampType()”。 (2认同)

NuO*_*One 14

在 spark 版本 >= 3 中,您可以在一行中将 Pandas 数据帧转换为 pyspark 数据帧

使用 spark.createDataFrame(pandasDF)

dataset = pd.read_csv("data/AS/test_v2.csv")

sparkDf = spark.createDataFrame(dataset);
Run Code Online (Sandbox Code Playgroud)

如果你对火花会话变量感到困惑,火花会话如下

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()
Run Code Online (Sandbox Code Playgroud)


Roy*_*eIX 8

我用你的数据尝试了这个,它正在工作:

%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()
Run Code Online (Sandbox Code Playgroud)