Spark SQL 如何读取压缩后的 csv 文件?

G_c*_*_cy 5 csv apache-spark apache-spark-sql

我曾与API试图spark.read.csv读取与扩展压缩csv文件bzgzip。有效。但是在源代码中我没有找到任何可以声明codec类型的选项参数。

即使在这个链接中,也只有codec写作方面的设置。谁能告诉我或提供显示 spark 2.x 版本如何处理压缩 csv 文件的源代码的路径。

Jac*_*ski 4

所有与文本相关的数据源,包括CSVDataSource,都使用 Hadoop File API 来处理文件(Spark Core 的 RDD 中也有)。

您可以在readFile中找到导致HadoopFileLinesReader的相关行,其中包含以下行:

val fileSplit = new FileSplit(
  new Path(new URI(file.filePath)),
  file.start,
  file.length,
  // TODO: Implement Locality
  Array.empty)
Run Code Online (Sandbox Code Playgroud)

它使用 Hadoop 的org.apache.hadoop.fs.Path来处理底层文件的压缩。


经过快速谷歌搜索后,我找到了处理压缩的 Hadoop 属性,即mapreduce.output.fileoutputformat.compress.

这导致我使用 Spark SQL 的CompressionCodecs进行以下压缩配置:

"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)
Run Code Online (Sandbox Code Playgroud)

在下面的代码中,您可以找到使用“our”选项的setCodecConfiguration 。

  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
    if (codec != null) {
      conf.set("mapreduce.output.fileoutputformat.compress", "true")
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
      conf.set("mapreduce.map.output.compress", "true")
      conf.set("mapreduce.map.output.compress.codec", codec)
    } else {
      // This infers the option `compression` is set to `uncompressed` or `none`.
      conf.set("mapreduce.output.fileoutputformat.compress", "false")
      conf.set("mapreduce.map.output.compress", "false")
    }
  }
Run Code Online (Sandbox Code Playgroud)

另一个方法getCodecClassName用于解析JSONCSV文本compression格式的选项。

  • 非常感谢您的耐心和友善。我通过链读取了“getCodecClassName”代码。我发现那部分代码只在编写端调用。我在阅读方面没有找到用法。我认为这项工作可能由文件系统来完成;但找不到证据。 (2认同)
  • 有趣的信息,但就像一些评论者指出的那样,它仅涉及写入端,而不是读取端。[此答案](/sf/answers/3106217761/) 没有显示 Spark 如何在读取时选择编解码器的相关内部结构,但它至少演示了如何指定自定义读取编解码器。 (2认同)