如果任何任务失败,如何将Airflow DAG运行标记为失败?

Yan*_*SSE 8 airflow

如果任何任务失败,是否有可能使Airflow DAG失败?

我通常在DAG的末尾有一些清理任务,而现在,无论何时最后一个任务成功,整个DAG都被标记为成功。

GuD*_*GuD 12

另一种解决方案是添加一个最终的 PythonOperator 来检查这次运行中所有任务的状态:

final_status = PythonOperator(
    task_id='final_status',
    provide_context=True,
    python_callable=final_status,
    trigger_rule=TriggerRule.ALL_DONE, # Ensures this task runs even if upstream fails
    dag=dag,
)

def final_status(**kwargs):
    for task_instance in kwargs['dag_run'].get_task_instances():
        if task_instance.current_state() != State.SUCCESS and \
                task_instance.task_id != kwargs['task_instance'].task_id:
            raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))
Run Code Online (Sandbox Code Playgroud)