如何优化 Spark 将大量数据写入 S3

Zac*_*ack 8 scala amazon-s3 amazon-emr apache-spark

我在 EMR 上使用 Apache Spark 进行了大量的 ETL。

我对获得良好性能所需的大部分调整相当满意,但我有一项工作我似乎无法弄清楚。

基本上,我获取了大约 1 TB 的 parquet 数据(分布在 S3 中的数万个文件中),并添加了几列并按数据的日期属性之一分区将其写出 - 再次,在 S3 中格式化了 parquet。

我这样跑:

spark-submit --conf spark.dynamicAllocation.enabled=true  --num-executors 1149 --conf spark.driver.memoryOverhead=5120 --conf  spark.executor.memoryOverhead=5120 --conf  spark.driver.maxResultSize=2g --conf spark.sql.shuffle.partitions=1600 --conf spark.default.parallelism=1600 --executor-memory 19G --driver-memory 19G --executor-cores 3 --driver-cores 3 --class com.my.class path.to.jar <program args>
Run Code Online (Sandbox Code Playgroud)

集群的大小是根据输入数据集的大小动态确定的,并且num-executors、spark.sql.shuffle.partitions和spark.default.parallelism参数是根据集群的大小计算的。

代码大致是这样实现的:

va df = (read from s3 and add a few columns like timestamp and source file name)

val dfPartitioned = df.coalesce(numPartitions)

val sqlDFProdDedup = spark.sql(s""" (query to dedup against prod data """);

sqlDFProdDedup.repartition($"partition_column")
  .write.partitionBy("partition_column")
  .mode(SaveMode.Append).parquet(outputPath)
Run Code Online (Sandbox Code Playgroud)

当我查看神经节图时,我在重复数据删除逻辑运行和一些数据洗牌时遇到了巨大的资源峰值,但随后数据的实际写入仅使用了一小部分资源并运行了几个小时。

我认为主要问题不是分区倾斜,因为数据应该公平地分布在所有分区上。

分区列本质上是一个月中的一天,因此每个作业通常只有 5-20 个分区,具体取决于输入数据集的跨度。每个分区通常在 10-20 个 parquet 文件中包含大约 100 GB 的数据。

我设置 Spark.sql.files.maxRecordsPerFile 来管理这些输出文件的大小。

所以,我的大问题是:如何提高这里的性能?

仅仅添加资源似乎并没有多大帮助。

我尝试过增大执行器(以减少洗牌)并增加每个执行器的 CPU 数量,但这似乎并不重要。

提前致谢!

小智 8

Zack,我有一个类似的用例,每天要处理的文件数量是“n”倍。我假设您按原样使用上面的代码并尝试提高整体工作的性能。以下是我的一些观察:

  1. 不确定该coalesce(numPartitions)数字实际上是什么以及为什么在重复数据删除过程之前使用它。您的 Spark-submit 显示您正在创建 1600 个分区,这已经足够开始了。

  2. 如果您要在写入之前重新分区,那么上面的合并可能根本没有好处,因为重新分区会打乱数据。

  3. 由于您声称编写了 10-20 个 parquet 文件,这意味着您在工作的最后部分仅使用 10-20 个核心进行编写,这是其速度缓慢的主要原因。根据 100 GB 估计,镶木地板文件的范围从大约 5GB 到 10 GB,这确实很大,我怀疑人们是否能够在本地笔记本电脑或 EC2 机器上打开它们,除非他们使用 EMR 或类似的设备(如果读取,则具有巨大的执行程序内存)整个文件或溢出到磁盘),因为内存要求太高。我建议创建大约 1GB 的镶木地板文件以避免任何这些问题。

此外,如果您创建 1GB parquet 文件,您可能会将进程速度加快 5 到 10 倍,因为您将使用更多执行器/核心来并行写入它们。实际上,您可以通过简单地使用默认分区编写数据帧来运行实验。

这让我意识到,您确实不需要像 write.partitionBy("partition_date") 调用那样使用重新分区。您的repartition()调用实际上是强制数据帧最多只有 30-31 个分区,具体取决于该月的天数,这正是驱动写入文件数量的原因。实际上是write.partitionBy("partition_date")在 S3 分区中写入数据,如果您的数据帧有 90 个分区,那么写入速度会快 3 倍 (3 *30)。df.repartition()正在迫使它放慢速度。您真的需要 5GB 或更大的文件吗?

  1. 另一个要点是 Spark 惰性求值有时太聪明了。在您的情况下,它很可能只使用基于repartition(number). 相反,你应该尝试,df.cache() -> df.count() and then df.write(). 这样做的目的是强制 Spark 使用所有可用的执行器核心。我假设您正在并行读取文件。在您当前的实施中,您可能使用 20-30 个核心。有一点需要注意,当您使用 r4/r5 机器时,请随意将执行器内存增加到 8 核的 48G。我发现 8 核对于我的任务来说比标准的 5 核建议更快。

  2. 另一个建议是尝试 ParallelGC 而不是 G1GC。对于这样的用例,当您读取 1000 倍的文件时,我注意到它的性能比 G1Gc 更好或不差。请试一试。

在我的工作负载中,我使用coalesce(n)基于方法,其中“n”为我提供了 1GB parquet 文件。我使用集群上所有可用的核心并行读取文件。仅在写入部分期间,我的核心处于空闲状态,但您无能为力来避免这种情况。

我不确定如何spark.sql.files.maxRecordsPerFile与 Pandas、Redshift Spectrum、Athena 等结合使用coalesce() or repartition(),但我发现 1GB 似乎可以接受。

希望能帮助到你。查鲁


小智 0

以下是一些加快运行速度的优化。

(1) 文件提交者 - 这是 Spark 将部分文件读取到 S3 存储桶的方式。每个操作都是不同的,并且将基于

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
Run Code Online (Sandbox Code Playgroud)

描述

这会将文件直接写入零件文件,或者最初将它们加载到临时文件并将它们复制到其最终状态零件文件。

(2) 对于文件大小,您可以根据每条记录的平均字节数得出它。下面我计算出每条记录的字节数,从而得出 1024 MB 的记录数。我会首先尝试每个分区 1024MB,然后向上移动。

import org.apache.spark.util.SizeEstimator

val numberBytes : Long = SizeEstimator.estimate(inputDF.rdd)
val reduceBytesTo1024MB = numberBytes/123217728
val numberRecords = inputDF.count
val recordsFor1024MB = (numberRecords/reduceBytesTo1024MB).toInt + 1 
Run Code Online (Sandbox Code Playgroud)

(3) [我没有尝试过] EMR Committer - 如果您使用的是 EMR 5.19 或更高版本,因为您正在输出 Parquet。您可以将 Parquet 优化写入器设置为 TRUE。

spark.sql.parquet.fs.optimized.committer.optimization-enabled true
Run Code Online (Sandbox Code Playgroud)