我在 EMR 上使用 Apache Spark 进行了大量的 ETL。
我对获得良好性能所需的大部分调整相当满意,但我有一项工作我似乎无法弄清楚。
基本上,我获取了大约 1 TB 的 parquet 数据(分布在 S3 中的数万个文件中),并添加了几列并按数据的日期属性之一分区将其写出 - 再次,在 S3 中格式化了 parquet。
我这样跑:
spark-submit --conf spark.dynamicAllocation.enabled=true --num-executors 1149 --conf spark.driver.memoryOverhead=5120 --conf spark.executor.memoryOverhead=5120 --conf spark.driver.maxResultSize=2g --conf spark.sql.shuffle.partitions=1600 --conf spark.default.parallelism=1600 --executor-memory 19G --driver-memory 19G --executor-cores 3 --driver-cores 3 --class com.my.class path.to.jar <program args>
Run Code Online (Sandbox Code Playgroud)
集群的大小是根据输入数据集的大小动态确定的,并且num-executors、spark.sql.shuffle.partitions和spark.default.parallelism参数是根据集群的大小计算的。
代码大致是这样实现的:
va df = (read from s3 and add a few columns like timestamp and source file name)
val dfPartitioned = df.coalesce(numPartitions)
val sqlDFProdDedup = spark.sql(s""" (query to dedup …Run Code Online (Sandbox Code Playgroud) 每当DAG中的任务运行失败或重试运行时,我都试图让Airflow使用AWS SES向我发送电子邮件。我也在使用我的AWS SES凭证,而不是我的一般AWS凭证。
我当前的airflow.cfg
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = myemail@myjob.com
Run Code Online (Sandbox Code Playgroud)
我的DAG中当前旨在故意失败并重试的任务:
testfaildag_library_install_jar_jdbc = PythonOperator(
task_id='library_install_jar',
retries=3, …Run Code Online (Sandbox Code Playgroud)