每当DAG中的任务运行失败或重试运行时,我都试图让Airflow使用AWS SES向我发送电子邮件。我也在使用我的AWS SES凭证,而不是我的一般AWS凭证。
我当前的airflow.cfg
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = myemail@myjob.com
Run Code Online (Sandbox Code Playgroud)
我的DAG中当前旨在故意失败并重试的任务:
testfaildag_library_install_jar_jdbc = PythonOperator(
task_id='library_install_jar',
retries=3, …Run Code Online (Sandbox Code Playgroud) 我有一个DAG然后无论何时成功或失败,我希望它触发一个发布到Slack的方法.
我的DAG args情况如下:
default_args = {
[...]
'on_failure_callback': slack.slack_message(sad_message),
'on_success_callback': slack.slack_message(happy_message),
[...]
}
Run Code Online (Sandbox Code Playgroud)
而DAG定义本身:
dag = DAG(
dag_id = dag_name_id,
default_args=default_args,
description='load data from mysql to S3',
schedule_interval='*/10 * * * *',
catchup=False
)
Run Code Online (Sandbox Code Playgroud)
但是,当我检查Slack时,每分钟有超过100条消息,好像正在评估每个调度程序心跳,并且对于每个日志,它确实运行了成功和失败方法,就好像它工作并且不适用于同一个任务实例(不是精细).
我应该如何正确使用on_failure_callback和on_success_callback处理dags状态并调用自定义方法?