如何查找Airflow中失败的上游任务数量?

mad*_*ad_ 4 airflow

我很难弄清楚如何找到在同一天(同一执行日)运行两次的同一个 dag 运行的失败任务。

考虑一个例子,当 dagdag_id=1在第一次运行时失败(由于任何原因,可能是连接超时)并且任务失败。当我们尝试查询时,TaskInstance 表将包含失败任务的条目。伟大的!!

但是,如果我重新运行相同的 dag(注意 dag_id 仍然是 1),那么在最后一个任务中(它的规则是,ALL_DONE无论上游任务是失败还是成功,它都会被执行)我想计算当前 dag_run 中失败的任务数,忽略之前的 dag_runs。我遇到了 dag_run id,如果我们可以将它与 TaskInstance 关联起来,它可能会很有用,但我不能。任何建议/帮助表示赞赏。

hea*_*ock 6

在 Airflow 1.10.x 中,可以通过更简单的代码来实现相同的结果,从而避免直接接触 ORM。

from airflow.utils.state import State

def your_python_operator_callable(**context):    
    tis_dagrun = context['ti'].get_dagrun().get_task_instances()
    failed_count = sum([True if ti.state == State.FAILED else False for ti in tis_dagrun])
    print(f"There are {failed_count} failed tasks in this execution"
Run Code Online (Sandbox Code Playgroud)

一个不幸的问题是,context['ti'].get_dagrun()从 CLI 运行单个任务的测试时不会返回 DAGRun 实例。实际上,该单个任务的手动测试将失败,但标准运行将按预期工作。