Spark读写S3-分区大小和压缩

Pal*_*ant 4 gzip amazon-s3 amazon-web-services apache-spark

我正在做一个实验,以了解s3和[EMR + Spark]哪种文件大小表现最佳

输入数据:不可压缩数据:文件中的随机字节总数据大小:20GB每个文件夹具有不同的输入文件大小:从2MB到4GB文件大小。

集群规格:1个主节点+ 4个节点:C3.8xls-驱动程序内存5G \-执行程序内存3G \-执行程序核心2 \-执行程序60 \

代码:

scala> def time[R](block: => R): R = {
          val t0 = System.nanoTime()
          val result = block    // call-by-name
         val t1 = System.nanoTime()
          println("Elapsed time: " + (t1 - t0) + "ns")
          result
      }
time: [R](block: => R)R

scala> val inputFiles = time{sc.textFile("s3://bucket/folder/2mb-10240files-20gb/*/*")};
scala> val outputFiles = time {inputFiles.saveAsTextFile("s3://bucket/folder-out/2mb-10240files-20gb/")};
Run Code Online (Sandbox Code Playgroud)

观察=>

  • 2MB-32MB:大多数时间都用于打开文件句柄[效率不高]
  • 从64MB到1GB:Spark本身针对所有这些文件大小启动320个任务,它不再具有20GB数据的存储桶中的文件数量,例如512 mb文件具有40个文件来生成20gb数据,并且可以完成40个任务,但是而是有320个任务,每个任务处理64MB数据。
  • 4GB文件大小:0字节输出[无法处理内存/数据甚至不可拆分???]

问题=>

  • 强制处理输入大小的任何默认设置为64MB?
  • 由于我正在使用的数据是随机字节并且已经压缩,因此如何进一步拆分此数据?如果可以拆分此数据,为什么无法拆分4gb目标文件大小的文件大小?
  • 通过Spark上传后,为什么压缩文件的大小会增加?2MB的压缩输入文件在输出存储区中变为3.6 MB。

Ra4*_*41P 6

由于未指定,因此我假设在回答中使用gzip和Spark 2.2。

  • 强制处理输入大小的任何默认设置为64MB?

就在这里。Spark是Hadoop项目,因此即使S3是基于对象的文件系统,也将其视为基于块的文件系统。因此,这里的真正问题是:您正在使用S3文件系统的哪个实现(s3a,s3n)等。在这里可以找到类似的问题。

  • 由于我使用的数据是随机字节,并且已经压缩,因此如何进一步拆分该数据?如果它可以拆分此数据,为什么不能拆分4GB目标文件大小的文件大小?

Spark文档表明它能够读取压缩文件:

Spark的所有基于文件的输入方法(包括textFile)都支持在目录,压缩文件和通配符上运行。例如,可以使用textFile(“ / my / directory”),textFile(“ / my / directory / .txt”)和textFile(“ / my / directory / .gz”)。

这意味着您的文件很容易阅读,并转换为每一行的纯文本字符串。

但是,您正在使用压缩文件。假设它是不可拆分的格式(例如gzip),则需要整个文件进行解压缩。您正在使用3gb执行程序运行,这些执行程序可以很好地满足4mb-1gb文件的需求,但不能一次处理大于3gb的文件(考虑到开销后可能会变小)。

一些进一步的信息可以在这个问题中找到。可拆分压缩类型的详细信息可以在此答案中找到。

  • 通过Spark上传后,为什么压缩文件的大小会增加?2MB的压缩输入文件在输出存储区中变为3.6 MB。

作为上一点的推论,这意味着spark在以纯文本格式读取时已经解压缩了RDD。重新上传时,不再压缩。要进行压缩,可以将压缩编解码器作为参数传递:

sc.saveAsTextFile("s3://path", classOf[org.apache.hadoop.io.compress.GzipCodec])
Run Code Online (Sandbox Code Playgroud)

还有其他压缩格式可用。