Rav*_*avi 6 airflow airflow-scheduler
我们有以下命令,我们可以清除失败的任务并一次尝试重新运行它们
airflow clear [-s START_DATE] [-e END_DATE] --only_failed dag_id
Run Code Online (Sandbox Code Playgroud)
有什么方法可以从所有 dags 中获取所有失败任务的信息并将其导出到文件(excel 或文本)中
这是一个未经测试的代码片段,应该对您有帮助
获取失败的列表TaskInstance(您可以修改它以添加过滤器,例如dag_id和start_date)
from typing import List, Optional
from airflow.models.taskinstance import TaskInstance
from airflow.utils import State
from airflow.settings import Session
from airflow.utils.db import provide_session
@provide_session
def get_failed_task_instances(session: Optional[Session] = None) -> List[TaskInstance]:
"""
Returns list of failed TaskInstance(s)
- for all DAGs since inception of time
- sorted by (1) dag_id ASC (2) start_date DESC
:param session: Optional[Session]
:return: List[TaskInstance]
"""
failed_task_instances: List[TaskInstance] = session.query(TaskInstance). \
filter(TaskInstance.state == State.FAILED). \
order_by(TaskInstance.dag_id.asc(), TaskInstance.start_date.desc()). \
all()
return failed_task_instances
Run Code Online (Sandbox Code Playgroud)
TaskInstance(实用函数)从dag_id, start_date&中提取相关位task_id(根据您的需要更改它)
def ti_to_string(ti: TaskInstance) -> List[str]:
"""
Converts a TaskInstance in List[str] by extracting relevant bits of info from it
:param ti: TaskInstance
:return: List[str]
"""
return [ti.dag_id, ti.start_date, ti.task_id]
Run Code Online (Sandbox Code Playgroud)
综合起来:将数据写入输出 CSV 文件
import csv
def write_failed_task_instances_to_csv(output_file_path: str) -> None:
"""
Writes list of failed tasks in the provided output CSV filepath
:param output_file_path:
:return: None
"""
# prepare list of failed TaskInstance(s)
failed_task_instances: List[TaskInstance] = get_failed_task_instances()
# extract relevant bits of info from TaskInstance(s) list (to serialize them)
failed_task_instances_data: List[List[str]] = list(map(ti_to_string, failed_task_instances))
# write data of failed TaskInstance(s) to output CSV filepath
with open(output_file_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerows(failed_task_instances_data)
Run Code Online (Sandbox Code Playgroud)
参考
views.py:这个文件是发现SQLAlchemyAirflow 魔力的最佳位置| 归档时间: |
|
| 查看次数: |
3486 次 |
| 最近记录: |