相关疑难解决方法(0)

在Apache Airflow DAG中使用AWS SES发送失败​​电子邮件

每当DAG中的任务运行失败或重试运行时,我都试图让Airflow使用AWS SES向我发送电子邮件。我也在使用我的AWS SES凭证,而不是我的一般AWS凭证。

我当前的airflow.cfg

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = myemail@myjob.com
Run Code Online (Sandbox Code Playgroud)

我的DAG中当前旨在故意失败并重试的任务:

testfaildag_library_install_jar_jdbc = PythonOperator(
    task_id='library_install_jar',
    retries=3, …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services amazon-ses airflow

2
推荐指数
1
解决办法
2514
查看次数

气流 - 处理DAG回调的正确方法

我有一个DAG然后无论何时成功或失败,我希望它触发一个发布到Slack的方法.

我的DAG args情况如下:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}
Run Code Online (Sandbox Code Playgroud)

DAG定义本身:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )
Run Code Online (Sandbox Code Playgroud)

但是,当我检查Slack时,每分钟有超过100条消息,好像正在评估每个调度程序心跳,并且对于每个日志,它确实运行了成功和失败方法,就好像它工作并且不适用于同一个任务实例(不是精细).

我应该如何正确使用on_failure_callbackon_success_callback处理dags状态并调用自定义方法?

python airflow airflow-scheduler

1
推荐指数
2
解决办法
2644
查看次数