dre*_*mer 3 hadoop amazon-s3 apache-spark parquet spark-dataframe
我正在运行一个Spark工作,这个工作花了太长时间来处理输入文件.输入文件为Gzip格式的6.8 GB,包含110 M行文本.我知道它是Gzip格式的,因此它不可拆分,只有一个执行器将用于读取该文件.
作为调试过程的一部分,我决定只看看将gzip文件转换为镶木地板需要多长时间.我的想法是,一旦我转换为镶木地板文件,然后如果我在该文件上运行我原来的Spark作业,那么它将使用多个执行程序并且输入文件将被并行处理.
但即使是小工作也需要很长时间.这是我的代码:
val input = sqlContext.read.text("input.gz")
input.write.parquet("s3n://temp-output/")
Run Code Online (Sandbox Code Playgroud)
当我在笔记本电脑(16 GB RAM)中提取该文件时,花了不到2分钟.当我在Spark集群上运行它时,我的期望是它将花费相同甚至更少的时间,因为我使用的执行程序内存是58 GB.花了大约20分钟.
我在这里错过了什么?我很抱歉,如果这听起来很业余,但我在Spark中相当新.
在gzip文件上运行Spark作业的最佳方法是什么?假设我没有选择以其他文件格式创建该文件(bzip2,snappy,lzo).
在执行输入流程输出类型的Spark作业时,需要考虑三个单独的问题:
在您的情况下,输入并行度为1,因为在您的问题中,您声称无法更改输入格式或粒度.
你也基本上没有处理,所以你不能获得任何收益.
但是,您可以控制输出并行性,这将带来两个好处:
将写入多个CPU,从而减少写入操作的总时间.
您的输出将分成多个文件,允许您在以后的处理中利用输入并行性.
要增加并行性,您必须增加分区数量repartition(),例如,
val numPartitions = ...
input.repartition(numPartitions).write.parquet("s3n://temp-output/")
Run Code Online (Sandbox Code Playgroud)
在选择最佳分区数时,需要考虑许多不同的因素.
如果不了解您的目标和限制条件,很难做出可靠的建议,但这里有一些通用的指导原则:
由于您的分区不会出现偏差(上面的使用repartition将使用纠正偏斜的散列分区程序),如果您将分区数设置为等于执行程序核心数,假设您正在使用,则将获得最快的吞吐量具有足够I/O的节点.
处理数据时,您确实希望整个分区能够"适应"分配给单个执行程序核心的RAM.什么"适合"在这里意味着取决于您的处理.如果您正在进行简单的map转换,则可以对数据进行流式处理.如果您正在做涉及订购的事情,那么RAM需要大幅增长.如果您使用Spark 1.6+,您将获得更灵活的内存管理的好处.如果您使用的是早期版本,则必须更加小心.当Spark必须开始"缓冲"到磁盘时,作业执行停止.磁盘大小和内存大小可能非常不同.后者根据您处理数据的方式以及Spark从谓词下推获得的好处(Parquet支持)而有所不同.使用Spark UI查看各个作业阶段需要多少RAM.
顺便说一句,除非您的数据具有非常特定的结构,否则不要硬编码分区号,因为那时您的代码将在不同大小的集群上以次优的方式运行.而是,使用以下技巧来确定群集中的执行程序数.然后,您可以根据您使用的计算机乘以每个执行程序的核心数.
// -1 is for the driver node
val numExecutors = sparkContext.getExecutorStorageStatus.length - 1
Run Code Online (Sandbox Code Playgroud)
作为参考点,在我们的团队中,我们使用相当复杂的数据结构,这意味着RAM大小>>磁盘大小,我们的目标是将S3对象保持在50-250Mb范围内,以便在每个执行器核心具有的节点上进行处理10-20Gb RAM.
希望这可以帮助.
| 归档时间: |
|
| 查看次数: |
1563 次 |
| 最近记录: |