如何使用spark-csv包在HDFS上只读取n行大型CSV文件?

Abh*_*hek 12 hdfs apache-spark apache-spark-sql pyspark spark-csv

我在HDFS上有一个很大的分布式文件,每次我使用带有spark-csv包的sqlContext时,它首先加载整个文件,这需要相当长的时间.

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")
Run Code Online (Sandbox Code Playgroud)

现在因为我只想做一些快速检查,所有我需要的是整个文件的少数/任意n行.

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)
Run Code Online (Sandbox Code Playgroud)

但所有这些都在文件加载完成后运行.我不能在读取文件本身时限制行数吗?我指的是spark-csv中n_rows等效的pandas,如:

pd_df = pandas.read_csv("file_path", nrows=20)
Run Code Online (Sandbox Code Playgroud)

或者可能是火花实际上没有加载文件,第一步,但在这种情况下,为什么我的文件加载步骤需要花费太多时间呢?

我想要

df.count()
Run Code Online (Sandbox Code Playgroud)

只给我n而不是所有的行,是否可能?

eli*_*sah 12

你可以用limit(n).

sqlContext.format('com.databricks.spark.csv') \
          .options(header='true', inferschema='true').load("file_path").limit(20)
Run Code Online (Sandbox Code Playgroud)

这只会加载20行.

  • “作为传递数据的扫描”==“加载为将数据放入内存中”与 Spark 执行程序相同。 (2认同)
  • @eliasah 提供的解决方案首先加载文件,然后限制它。如果我有 PB 文件,这有什么帮助?这无论如何都会加载所有行,然后显示有限的记录 n。 (2认同)

Jac*_*ski 9

我的理解是spark-csv模块不直接支持仅读取几行,并且作为一种变通办法,您可以将文件读取为文本文件,并根据需要选择多行并将其保存到某个临时位置。保存线路后,您可以使用spark-csv读取线路,包括inferSchemaoptions(如果您处于探索模式,则可能要使用该选项)。

val numberOfLines = ...
spark.
  read.
  text("myfile.csv").
  limit(numberOfLines).
  write.
  text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
  read.
  option("inferSchema", true). // <-- you are in exploration mode, aren't you?
  csv(s"myfile-$numberOfLines.csv")
Run Code Online (Sandbox Code Playgroud)