我很难弄清楚如何找到在同一天(同一执行日)运行两次的同一个 dag 运行的失败任务。
考虑一个例子,当 dagdag_id=1在第一次运行时失败(由于任何原因,可能是连接超时)并且任务失败。当我们尝试查询时,TaskInstance 表将包含失败任务的条目。伟大的!!
但是,如果我重新运行相同的 dag(注意 dag_id 仍然是 1),那么在最后一个任务中(它的规则是,ALL_DONE无论上游任务是失败还是成功,它都会被执行)我想计算当前 dag_run 中失败的任务数,忽略之前的 dag_runs。我遇到了 dag_run id,如果我们可以将它与 TaskInstance 关联起来,它可能会很有用,但我不能。任何建议/帮助表示赞赏。
在 Airflow 1.10.x 中,可以通过更简单的代码来实现相同的结果,从而避免直接接触 ORM。
from airflow.utils.state import State
def your_python_operator_callable(**context):
tis_dagrun = context['ti'].get_dagrun().get_task_instances()
failed_count = sum([True if ti.state == State.FAILED else False for ti in tis_dagrun])
print(f"There are {failed_count} failed tasks in this execution"
Run Code Online (Sandbox Code Playgroud)
一个不幸的问题是,context['ti'].get_dagrun()从 CLI 运行单个任务的测试时不会返回 DAGRun 实例。实际上,该单个任务的手动测试将失败,但标准运行将按预期工作。
| 归档时间: |
|
| 查看次数: |
5890 次 |
| 最近记录: |