小编Zac*_*ack的帖子

如何优化 Spark 将大量数据写入 S3

我在 EMR 上使用 Apache Spark 进行了大量的 ETL。

我对获得良好性能所需的大部分调整相当满意,但我有一项工作我似乎无法弄清楚。

基本上,我获取了大约 1 TB 的 parquet 数据(分布在 S3 中的数万个文件中),并添加了几列并按数据的日期属性之一分区将其写出 - 再次,在 S3 中格式化了 parquet。

我这样跑:

spark-submit --conf spark.dynamicAllocation.enabled=true  --num-executors 1149 --conf spark.driver.memoryOverhead=5120 --conf  spark.executor.memoryOverhead=5120 --conf  spark.driver.maxResultSize=2g --conf spark.sql.shuffle.partitions=1600 --conf spark.default.parallelism=1600 --executor-memory 19G --driver-memory 19G --executor-cores 3 --driver-cores 3 --class com.my.class path.to.jar <program args>
Run Code Online (Sandbox Code Playgroud)

集群的大小是根据输入数据集的大小动态确定的,并且num-executors、spark.sql.shuffle.partitions和spark.default.parallelism参数是根据集群的大小计算的。

代码大致是这样实现的:

va df = (read from s3 and add a few columns like timestamp and source file name)

val dfPartitioned = df.coalesce(numPartitions)

val sqlDFProdDedup = spark.sql(s""" (query to dedup …
Run Code Online (Sandbox Code Playgroud)

scala amazon-s3 amazon-emr apache-spark

8
推荐指数
2
解决办法
2万
查看次数

在Apache Airflow DAG中使用AWS SES发送失败​​电子邮件

每当DAG中的任务运行失败或重试运行时,我都试图让Airflow使用AWS SES向我发送电子邮件。我也在使用我的AWS SES凭证,而不是我的一般AWS凭证。

我当前的airflow.cfg

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = myemail@myjob.com
Run Code Online (Sandbox Code Playgroud)

我的DAG中当前旨在故意失败并重试的任务:

testfaildag_library_install_jar_jdbc = PythonOperator(
    task_id='library_install_jar',
    retries=3, …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services amazon-ses airflow

2
推荐指数
1
解决办法
2514
查看次数