如何从不同的 dags 获取所有失败任务的列表

Rav*_*avi 6 airflow airflow-scheduler

我们有以下命令,我们可以清除失败的任务并一次尝试重新运行它们

airflow clear [-s START_DATE] [-e END_DATE] --only_failed dag_id
Run Code Online (Sandbox Code Playgroud)

有什么方法可以从所有 dags 中获取所有失败任务的信息并将其导出到文件(excel 或文本)中

y2k*_*ham 6

这是一个未经测试的代码片段,应该对您有帮助

  • 获取失败的列表TaskInstance(您可以修改它以添加过滤器,例如dag_idstart_date

     from typing import List, Optional
     from airflow.models.taskinstance import TaskInstance
     from airflow.utils import State
     from airflow.settings import Session
     from airflow.utils.db import provide_session
    
     @provide_session
     def get_failed_task_instances(session: Optional[Session] = None) -> List[TaskInstance]:
         """
         Returns list of failed TaskInstance(s)
          - for all DAGs since inception of time
          - sorted by (1) dag_id ASC (2) start_date DESC
         :param session: Optional[Session]
         :return: List[TaskInstance]
         """
         failed_task_instances: List[TaskInstance] = session.query(TaskInstance). \
             filter(TaskInstance.state == State.FAILED). \
             order_by(TaskInstance.dag_id.asc(), TaskInstance.start_date.desc()). \
             all()
         return failed_task_instances
    
    Run Code Online (Sandbox Code Playgroud)
  • TaskInstance(实用函数)从dag_id, start_date&中提取相关位task_id(根据您的需要更改它)

     def ti_to_string(ti: TaskInstance) -> List[str]:
         """
         Converts a TaskInstance in List[str] by extracting relevant bits of info from it
         :param ti: TaskInstance
         :return: List[str]
         """
         return [ti.dag_id, ti.start_date, ti.task_id]
    
    Run Code Online (Sandbox Code Playgroud)
  • 综合起来:将数据写入输出 CSV 文件

     import csv
     def write_failed_task_instances_to_csv(output_file_path: str) -> None:
         """
         Writes list of failed tasks in the provided output CSV filepath
         :param output_file_path:
         :return: None
         """
         # prepare list of failed TaskInstance(s)
         failed_task_instances: List[TaskInstance] = get_failed_task_instances()
         # extract relevant bits of info from TaskInstance(s) list (to serialize them)
         failed_task_instances_data: List[List[str]] = list(map(ti_to_string, failed_task_instances))
         # write data of failed TaskInstance(s) to output CSV filepath
         with open(output_file_path, "w", newline="") as f:
             writer = csv.writer(f)
             writer.writerows(failed_task_instances_data)
    
    Run Code Online (Sandbox Code Playgroud)

参考