如何跳过Spark中CSV文件的标题?

Haf*_*did 67 csv scala apache-spark

假设我给三个文件路径指向要读取的Spark上下文,并且每个文件在第一行中都有一个模式.我们如何从头文件中跳过架构线?

val rdd=sc.textFile("file1,file2,file3")
Run Code Online (Sandbox Code Playgroud)

现在,我们如何跳过此rdd的标题行?

Jim*_*mmy 89

data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header
Run Code Online (Sandbox Code Playgroud)

  • 该问题询问如何跳过csv文件中的标题,如果标题始终存在,它们将出现在第一行中. (6认同)
  • 这并非总是如此.如果你用Spark写出一个csv,可能会有多个文件,每个文件都有自己的标题.使用它作为另一个Spark程序的输入将为您提供多个标题.此外,您可以使用Spark在一个文件中输入多个文件. (3认同)

Sea*_*wen 62

如果第一条记录中只有一个标题行,那么过滤它的最有效方法是:

rdd.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}
Run Code Online (Sandbox Code Playgroud)

如果当然有许多文件中包含许多标题行,这没有用.事实上,你可以通过这种方式结合三个RDD.

你也可以写一个filter只匹配一个可能是标题的行.这很简单,但效率较低.

Python等价物:

from itertools import islice

rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)
Run Code Online (Sandbox Code Playgroud)

  • 过滤方法仍然比另一个答案中提出的`zipWithIndex`方法更有效. (4认同)

San*_*hit 56

在Spark 2.0中,CSV阅读器内置于Spark中,因此您可以轻松加载CSV文件,如下所示:

spark.read.option("header","true").csv("filePath")
Run Code Online (Sandbox Code Playgroud)

  • @ciri spark不是SparkStext对象的SparkSession对象,所以如果你想使用csv reader你需要SparkSession对象 (10认同)

Shi*_*nsh 13

Spark 2.0开始,您可以使用SparkSession将其作为一个内容完成:

val spark = SparkSession.builder.config(conf).getOrCreate()
Run Code Online (Sandbox Code Playgroud)

然后作为@SandeepPurohit说:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
Run Code Online (Sandbox Code Playgroud)

我希望它能解决你的问题!

PS:SparkSession是Spark 2.0中引入的新入口点,可以在spark_sql包找到


hay*_*ayj 7

在PySpark中,您可以使用数据框并将标头设置为True:

df = spark.read.csv(dataPath, header=True)
Run Code Online (Sandbox Code Playgroud)


Ant*_*uan 7

2018 年工作 (Spark 2.3)

Python

df = spark.read
    .option("header", "true")
    .format("csv")
    .schema(myManualSchema)
    .load("mycsv.csv")
Run Code Online (Sandbox Code Playgroud)

斯卡拉

val myDf = spark.read
  .option("header", "true")
  .format("csv")
  .schema(myManualSchema)
  .load("mycsv.csv")
Run Code Online (Sandbox Code Playgroud)

PD1: myManualSchema 是我写的预定义架构,你可以跳过那部分代码

更新 2021 相同的代码适用于 Spark 3.x

df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .format("csv")
    .csv("mycsv.csv")
Run Code Online (Sandbox Code Playgroud)

  • @Antonio Cachuan,你应该提供可以工作的代码,没有你的个人示例“schema(myManualSchema)”根本不应该出现在解决方案中。 (2认同)

pze*_*vic 5

您可以单独加载每个文件,过滤它们file.zipWithIndex().filter(_._2 > 0)然后联合所有文件RDD.

如果文件数太大,联盟可能会抛出一个StackOverflowExeption.