如何在EMR上调整spark作业以在S3上快速写入大量数据

SUD*_*HAN 17 amazon-emr hadoop2 apache-spark-sql spark-dataframe

我有一个火花工作,我在两个数据帧之间进行外连接.第一个数据帧的大小为260 GB,文件格式为文本文件,分为2200个文件,第二个数据帧的大小为2GB.然后将大约260 GB的数据帧输出写入S3需要很长时间超过2小时后我取消因为我已经在EMR上进行了大量更改.

这是我的群集信息.

emr-5.9.0
Master:    m3.2xlarge
Core:      r4.16xlarge   10 machines (each machine has 64 vCore, 488 GiB memory,EBS Storage:100 GiB)
Run Code Online (Sandbox Code Playgroud)

这是我正在设置的群集配置

capacity-scheduler  yarn.scheduler.capacity.resource-calculator :org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
emrfs-site  fs.s3.maxConnections:   200
spark   maximizeResourceAllocation: true
spark-defaults  spark.dynamicAllocation.enabled:    true
Run Code Online (Sandbox Code Playgroud)

我尝试手动设置内存组件,如下所示,性能更好,但同样的事情又花了很长时间

--num-executors 60 - conf spark.yarn.executor.memoryOverhead = 9216 --executor-memory 72G --conf spark.yarn.driver.memoryOverhead = 3072 --driver-memory 26G --execeror-cores 10 - driver-cores 3 --conf spark.default.parallelism = 1200

我没有使用默认分区将数据保存到S3.

添加有关作业和查询计划的所有详细信息,以便于理解.

真正的原因是分区.这大部分时间都在占用.因为我有2K文件,所以如果我使用像200这样的重新分区,输出文件以十万分之一形式出现,然后在spark中重新加载并不是一个好故事.

在下图中我不知道为什么在项目之后再次调用sort 在此输入图像描述

在下面Image GC对我来说太高了..请oi必须处理这个请建议如何? 执行者和GC细节

下面是节点健康状态.这一点数据被保存到S3中,难怪为什么我只能看到两个节点处于活动状态并且都处于空闲状态. 这是我的节点详细信息.此时数据将保存到S3中

这是加载时的集群细节.在这一点上,我可以看到集群已被充分利用,但在将数据保存到S3时,许多节点都是免费的. 充分利用的clsuter

最后这里是我的代码,我执行Join然后保存到S3 ...

import org.apache.spark.sql.expressions._

          val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").desc)
          val latestForEachKey = df2resultTimestamp.withColumn("rank", row_number.over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

          val columnMap = latestForEachKey.columns.filter(c => c.endsWith("_1") & c != "FFAction|!|_1").map(c => c -> c.dropRight(2)) :+ ("FFAction|!|_1", "FFAction|!|")
          val exprs = columnMap.map(t => coalesce(col(s"${t._1}"), col(s"${t._2}")).as(s"${t._2}"))
          val exprsExtended = Array(col("uniqueFundamentalSet"), col("PeriodId"), col("SourceId"), col("StatementTypeCode"), col("StatementCurrencyId"), col("FinancialStatementLineItem_lineItemId")) ++ exprs

          //Joining both dara frame here
          val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))
          //Joing ends here

          val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement", concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))

          val headerColumn = dataHeader.columns.toSeq

          val headerFinal = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

          val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", headerFinal)

          //  dfMainOutputFinalWithoutNull.repartition($"DataPartition", $"PartitionYear", $"PartitionStatement")
  .write
  .partitionBy("DataPartition", "PartitionYear", "PartitionStatement")
  .format("csv")
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "bzip2")
  .save(outputFileURL)
Run Code Online (Sandbox Code Playgroud)

Wil*_*ill 3

您正在运行五个 c3.4large EC2 实例,每个实例有 30GB RAM。所以总共只有 150GB,比要加入的 >200GB 数据帧要小得多。因此会出现大量磁盘溢出。也许您可以启动 r 类型 EC2 实例(内存优化,而不是计算优化的 c 类型),并查看是否有性能改进。