我在 EMR 上使用 Apache Spark 进行了大量的 ETL。
我对获得良好性能所需的大部分调整相当满意,但我有一项工作我似乎无法弄清楚。
基本上,我获取了大约 1 TB 的 parquet 数据(分布在 S3 中的数万个文件中),并添加了几列并按数据的日期属性之一分区将其写出 - 再次,在 S3 中格式化了 parquet。
我这样跑:
spark-submit --conf spark.dynamicAllocation.enabled=true --num-executors 1149 --conf spark.driver.memoryOverhead=5120 --conf spark.executor.memoryOverhead=5120 --conf spark.driver.maxResultSize=2g --conf spark.sql.shuffle.partitions=1600 --conf spark.default.parallelism=1600 --executor-memory 19G --driver-memory 19G --executor-cores 3 --driver-cores 3 --class com.my.class path.to.jar <program args>
Run Code Online (Sandbox Code Playgroud)
集群的大小是根据输入数据集的大小动态确定的,并且num-executors、spark.sql.shuffle.partitions和spark.default.parallelism参数是根据集群的大小计算的。
代码大致是这样实现的:
va df = (read from s3 and add a few columns like timestamp and source file name)
val dfPartitioned = df.coalesce(numPartitions)
val sqlDFProdDedup = spark.sql(s""" (query to dedup …Run Code Online (Sandbox Code Playgroud)