Apache Spark中的Zip支持

Mik*_*kia 9 compression zip apache-spark

我看了一下星火对的支持gzip的 -kind输入文件在这里,我不知道如果同样支持不同类型的压缩文件,如存在.ZIP文件.到目前为止,我已经尝试计算压缩在zip文件下的文件,但Spark似乎无法成功读取其内容.

我已经采取了外观的HadoopnewAPIHadoopFilenewAPIHadoopRDD,但到目前为止,我还没有能够得到任何工作.

此外,Spark支持为指定文件夹下的每个文件创建分区,如下例所示:

SparkConf SpkCnf = new SparkConf().setAppName("SparkApp")
                                  .setMaster("local[4]");

JavaSparkContext Ctx = new JavaSparkContext(SpkCnf);

JavaRDD<String> FirstRDD = Ctx.textFile("C:\input\).cache();
Run Code Online (Sandbox Code Playgroud)

where C:\input\指向具有多个文件的目录.

在计算压缩文件的情况下,是否也可以将每个文件打包在一个压缩文件下,并按照每个文件一个分区的相同模式?

Hol*_*den 7

由于Apache Spark使用Hadoop输入格式,我们可以查看有关如何处理zip文件的hadoop文档,看看是否有适用的东西.

这个网站让我们知道如何使用它(即我们可以使用ZipFileInputFormat).话虽这么说,因为zip文件不是拆分表(见这个),你的单个压缩文件的请求并不是很好.相反,如果可能的话,最好有一个包含许多单独的zip文件的目录.

这个问题与其他问题类似,但它增加了一个额外的问题:是否可以有一个zip文件(因为它不是一个分裂表格式不是一个好主意).


Ata*_*ais 5

Spark默认支持压缩文件

根据Spark编程指南

Spark的所有基于文件的输入方法(包括textFile)都支持在目录,压缩文件和通配符上运行.例如,您可以使用textFile("/ my/directory"),textFile("/ my/directory/.txt")和textFile("/ my/directory / .gz").

这可以通过提供有关Hadoop支持的压缩格式的信息来扩展,基本上可以通过查找所有扩展类来检查CompressionCodec(docs)

name    | ext      | codec class
-------------------------------------------------------------
bzip2   | .bz2     | org.apache.hadoop.io.compress.BZip2Codec 
default | .deflate | org.apache.hadoop.io.compress.DefaultCodec 
deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec 
gzip    | .gz      | org.apache.hadoop.io.compress.GzipCodec 
lz4     | .lz4     | org.apache.hadoop.io.compress.Lz4Codec 
snappy  | .snappy  | org.apache.hadoop.io.compress.SnappyCodec
Run Code Online (Sandbox Code Playgroud)

来源:列出可用的hadoop编解码器

因此,只需调用以下内容即可实现上述格式和更多可能性:

sc.readFile(path)
Run Code Online (Sandbox Code Playgroud)

在Spark中读取zip文件

不幸的是,zip默认情况下不在受支持的列表中.

我找到了一篇很棒的文章:Hadoop:在Map/Reduce中处理ZIP文件,以及一些解释(示例),解释了如何ZipFileInputFormatsc.newAPIHadoopFileAPI 一起导入.但这对我不起作用.

我的解决方案

如果没有任何外部依赖关系,您可以加载文件,sc.binaryFiles然后解压缩PortableDataStream读取内容.这是我选择的方法.

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.endsWith(".zip")) {
        sc.binaryFiles(path, minPartitions)
          .flatMap { case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            // this solution works only for single file in the zip
            val entry = zis.getNextEntry
            val br = new BufferedReader(new InputStreamReader(zis))
            Stream.continually(br.readLine()).takeWhile(_ != null)
          }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }
Run Code Online (Sandbox Code Playgroud)

使用此隐式类,您需要导入它并调用readFile 方法SparkContext:

import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)
Run Code Online (Sandbox Code Playgroud)

隐式类将zip正确加载您的文件并RDD[String]像以前一样返回.

注意:这仅适用于zip存档中的单个文件!
对于zip支持中的多个文件,请查看以下答案:https://stackoverflow.com/a/45958458/1549135