相关疑难解决方法(0)

气流默认为on_failure_callback

在我的DAG文件中,我定义了一个on_failure_callback()函数,以便在发生故障时发布Slack.

如果我为DAG中的每个运算符指定它,它运行良好:on_failure_callback = on_failure_callback()

有没有办法自动化(例如通过default_args,或通过我的DAG对象)调度到我的所有运营商?

python operators airflow apache-airflow

14
推荐指数
1
解决办法
1万
查看次数

气流 - 处理DAG回调的正确方法

我有一个DAG然后无论何时成功或失败,我希望它触发一个发布到Slack的方法.

我的DAG args情况如下:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}
Run Code Online (Sandbox Code Playgroud)

DAG定义本身:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )
Run Code Online (Sandbox Code Playgroud)

但是,当我检查Slack时,每分钟有超过100条消息,好像正在评估每个调度程序心跳,并且对于每个日志,它确实运行了成功和失败方法,就好像它工作并且不适用于同一个任务实例(不是精细).

我应该如何正确使用on_failure_callbackon_success_callback处理dags状态并调用自定义方法?

python airflow airflow-scheduler

1
推荐指数
2
解决办法
2644
查看次数