气流 - 处理DAG回调的正确方法

Jul*_*ide 1 python airflow airflow-scheduler

我有一个DAG然后无论何时成功或失败,我希望它触发一个发布到Slack的方法.

我的DAG args情况如下:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}
Run Code Online (Sandbox Code Playgroud)

DAG定义本身:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )
Run Code Online (Sandbox Code Playgroud)

但是,当我检查Slack时,每分钟有超过100条消息,好像正在评估每个调度程序心跳,并且对于每个日志,它确实运行了成功和失败方法,就好像它工作并且不适用于同一个任务实例(不是精细).

我应该如何正确使用on_failure_callbackon_success_callback处理dags状态并调用自定义方法?

cwu*_*rtz 7

它创建消息的原因是因为在定义消息时default_args,您正在执行这些功能.您只需传递函数定义而不执行它.

由于函数有一个参数,它会变得有点棘手.您可以定义两个部分函数或定义两个包装函数.

所以你可以这样做:

from functools import partial

success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}
Run Code Online (Sandbox Code Playgroud)

要么

def success_msg():
    slack.slack_message(happy_message);

def failure_msg():
    slack.slack_message(sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}
Run Code Online (Sandbox Code Playgroud)

在任何一种方法中,都要注意函数定义failure_msgsuccess_msg传递的方式,而不是它们在执行时给出的结果.


the*_*low 5

default_args 在任务级别扩展,因此它变成每个任务回调

在“default_args”之外的 DAG 标志级别应用属性