小编Zac*_*ack的帖子

如何优化 Spark 将大量数据写入 S3

我在 EMR 上使用 Apache Spark 进行了大量的 ETL。

我对获得良好性能所需的大部分调整相当满意,但我有一项工作我似乎无法弄清楚。

基本上,我获取了大约 1 TB 的 parquet 数据(分布在 S3 中的数万个文件中),并添加了几列并按数据的日期属性之一分区将其写出 - 再次,在 S3 中格式化了 parquet。

我这样跑:

spark-submit --conf spark.dynamicAllocation.enabled=true  --num-executors 1149 --conf spark.driver.memoryOverhead=5120 --conf  spark.executor.memoryOverhead=5120 --conf  spark.driver.maxResultSize=2g --conf spark.sql.shuffle.partitions=1600 --conf spark.default.parallelism=1600 --executor-memory 19G --driver-memory 19G --executor-cores 3 --driver-cores 3 --class com.my.class path.to.jar <program args>
Run Code Online (Sandbox Code Playgroud)

集群的大小是根据输入数据集的大小动态确定的,并且num-executors、spark.sql.shuffle.partitions和spark.default.parallelism参数是根据集群的大小计算的。

代码大致是这样实现的:

va df = (read from s3 and add a few columns like timestamp and source file name)

val dfPartitioned = df.coalesce(numPartitions)

val sqlDFProdDedup = spark.sql(s""" (query to dedup …
Run Code Online (Sandbox Code Playgroud)

scala amazon-s3 amazon-emr apache-spark

8
推荐指数
2
解决办法
2万
查看次数

标签 统计

amazon-emr ×1

amazon-s3 ×1

apache-spark ×1

scala ×1