针对任何 DAG 故障的全球警报

Maw*_*Pls 0 airflow

我目前有超过 100 个 DAG 在生产中运行。我知道如何添加on_failure_callback由上游故障触发的操作员的警报,但是有没有一种方法可以将 Airflow 本身配置为在 DAG 发生故障时始终发送电子邮件,而无需遍历并更新我的每个 DAG 来发出警报个别失败?

Dan*_*ang 6

据我所知,还没有,但我有这个助手来处理我的全局/默认 dag/操作符设置:

def on_failure_callback(context):
    ...

def on_success_callback(context):
    ...

def build_default_args(**kwargs):
    default_args = {
        'on_failure_callback': on_failure_callback,
        'on_success_callback': on_success_callback,
        'owner': 'me',
        'queue': 'default',
        'execution_timeout': timedelta(hours=1),
        'retries': 3,
        'retry_delay': timedelta(seconds=10),
    }
    default_args.update(kwargs)
    return default_args
Run Code Online (Sandbox Code Playgroud)

然后在每个 DAG 中:

dag = DAG(
    dag_id='my_dag',
    default_args=build_default_args(
        start_date=datetime(2017, 9, 20),
        execution_timeout=timedelta(hours=8),  # overrides default
    ),
    schedule_interval='@hourly',
)
Run Code Online (Sandbox Code Playgroud)

或者一些自定义基DAG类......但无论哪种方式,您仍然必须返回并更改您的 100 多个 DAG 一次。