Spark / Scala打开压缩的CSV文件

Ben*_*Ben 5 scala apache-spark

我是Spark和Scala的新手。我们有广告事件日志文件,格式为CSV,然后使用pkzip压缩。我已经看到了许多有关如何使用Java解压缩压缩文件的示例,但是如何使用Scala for Spark来执行此操作?最终,我们希望从每个传入文件中获取,提取数据并将其加载到Hbase目标表中。也许可以使用HadoopRDD做到这一点吗?此后,我们将介绍Spark流技术来监视这些文件。

谢谢,本

Ata*_*ais 5

默认压缩支持

如果您使用 Spark (Hadoop) 中默认提供的压缩格式,@samthebest答案是正确的。哪个是:

  • 压缩包2
  • 压缩包
  • lz4
  • 活泼的

我在其他答案中更深入地解释了这个主题:/sf/answers/3217072771/

阅读拉链

但是,如果您尝试读取zip文件,则需要创建自定义解决方案。我已经提供的答案中提到了一个。

如果您需要从存档中读取多个文件,您可能对我提供的答案感兴趣:https ://stackoverflow.com/a/45958458/1549135

基本上,一直使用sc.binaryFiles并随后解压缩PortableDataStream,如示例中所示:

sc.binaryFiles(path, minPartitions)
  .flatMap { case (name: String, content: PortableDataStream) =>
    val zis = new ZipInputStream(content.open)
    Stream.continually(zis.getNextEntry)
          .takeWhile(_ != null)
          .flatMap { _ =>
              val br = new BufferedReader(new InputStreamReader(zis))
              Stream.continually(br.readLine()).takeWhile(_ != null)
          }
Run Code Online (Sandbox Code Playgroud)


sam*_*est 4

在 Spark 中,如果您的文件具有正确的文件名后缀(例如 .gz 表示 gzipped),并且它受 的支持org.apache.hadoop.io.compress.CompressionCodecFactory,那么您可以使用

sc.textFile(path)
Run Code Online (Sandbox Code Playgroud)

更新:在撰写本文时,Hadoop bzip2 库中存在一个错误,这意味着尝试使用 Spark 读取 bzip2 文件会导致奇怪的异常 - 通常是 ArrayIndexOutOfBounds。