使用Spark加载CSV文件

Ker*_*ael 95 python csv apache-spark pyspark

我是Spark的新手,我正在尝试使用Spark从文件中读取CSV数据.这就是我在做的事情:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()
Run Code Online (Sandbox Code Playgroud)

我希望这个调用能给我一个我文件的两个第一列的列表,但是我收到了这个错误:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
Run Code Online (Sandbox Code Playgroud)

虽然我的CSV文件不止一列.

zer*_*323 166

Spark 2.0.0+

您可以直接使用内置csv数据源:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)
Run Code Online (Sandbox Code Playgroud)

要么

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))
Run Code Online (Sandbox Code Playgroud)

不包括任何外部依赖项.

Spark <2.0.0:

我建议spark-csv:而不是手动解析,这在一般情况下远非微不足道.

确保星火CSV包含在路径(--packages,--jars,--driver-class-path)

并按如下方式加载数据:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))
Run Code Online (Sandbox Code Playgroud)

它可以处理加载,模式推断,丢弃格式错误的行,并且不需要将数据从Python传递到JVM.

注意:

如果您了解架构,最好避免架构推断并将其传递给DataFrameReader.假设你有三列 - 整数,双精度和字符串:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))
Run Code Online (Sandbox Code Playgroud)

  • 如果这样做,请不要忘记在打开pyspark shell时使用databricks csv包或使用spark-submit.例如,`pyspark --packages com.databricks:spark-csv_2.11:1.4.0`(确保将databricks/spark版本更改为已安装的版本). (6认同)

G Q*_*ana 59

你确定所有的行都至少有2列吗?你可以尝试一下,只是为了检查?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()
Run Code Online (Sandbox Code Playgroud)

或者,你可以打印罪魁祸首(如果有的话):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
Run Code Online (Sandbox Code Playgroud)

  • 有很多工具可以解析csv,不要重新发明轮子 (4认同)
  • 最好使用内置的`csv`库进行解析以处理所有转义,因为如果值中包含逗号,则简单地按逗号分隔将不起作用。 (2认同)
  • 如果引号内有逗号,则此代码将中断。解析csv比仅在`“,”`分割要复杂得多。 (2认同)

小智 21

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");

print(df.collect())
Run Code Online (Sandbox Code Playgroud)


小智 16

还有另一种选择,包括使用Pandas读取CSV文件,然后将Pandas DataFrame导入Spark.

例如:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
Run Code Online (Sandbox Code Playgroud)

  • 如果他能够在熊猫中加载数据,为什么OP会想要做火花呢 (6认同)
  • 学术目的 (2认同)

Gal*_*ong 16

简单地用逗号分割也会分割字段内的逗号(例如a,b,"1,2,3",c),所以不推荐使用逗号.如果你想使用DataFrames API,zero323的答案是好的,但是如果你想坚持基础Spark,你可以用csv模块解析基础Python中的csvs :

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
Run Code Online (Sandbox Code Playgroud)

编辑:正如@muon在评论中提到的,这将像任何其他行一样处理标题,因此您需要手动提取它.例如,header = rdd.first(); rdd = rdd.filter(lambda x: x != header)(确保header在过滤器评估之前不要修改).但是在这一点上,你最好使用内置的csv解析器.


小智 10

这是在 PYSPARK

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)
Run Code Online (Sandbox Code Playgroud)

然后你可以检查

df.show(5)
df.count()
Run Code Online (Sandbox Code Playgroud)


abb*_*obh 6

这与JP Mercier 最初关于使用 Pandas 的建议一致,但有一个重大修改:如果将数据分块读入 Pandas,它应该更具延展性。意思是,您可以解析比 Pandas 可以实际处理的大得多的文件,并将其以更小的尺寸传递给 Spark。(这也回答了关于如果他们无论如何都可以将所有内容加载到 Pandas 中为什么要使用 Spark 的评论。)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
Run Code Online (Sandbox Code Playgroud)


opt*_*ist 5

现在,对于任何常规的csv文件,还有另一个选项:https : //github.com/seahboonsiew/pyspark-csv,如下所示:

假设我们有以下上下文

sc = SparkContext
sqlCtx = SQLContext or HiveContext
Run Code Online (Sandbox Code Playgroud)

首先,使用SparkContext将pyspark-csv.py分发给执行者

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
Run Code Online (Sandbox Code Playgroud)

通过SparkContext读取CSV数据并将其转换为DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
Run Code Online (Sandbox Code Playgroud)


iec*_*007 5

如果您的 csv 数据恰好在任何字段中不包含换行符,您可以加载数据并textFile()解析它

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)
Run Code Online (Sandbox Code Playgroud)


Jer*_*ril 5

如果要将csv加载为数据帧,则可以执行以下操作:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file
Run Code Online (Sandbox Code Playgroud)

对我来说很好。