如何获取 Airflow 作业的最后两个成功执行日期?

Sai*_*eja 6 airflow

我需要获取 Airflow 作业的最后两个成功执行日期以在当前运行中使用。示例:执行日期 作业状态 2020-05-03 成功 2020-05-04 失败 2020-05-05 成功

问题:当我在 5 月 6 日运行作业时,我应该将 5 月 3 日和 5 日的值放入变量中。是否可以?

y2k*_*ham 5

您可以利用SQLAlchemy魔法来检索execution_date最后“n”次成功运行的 s

from pendulum import Pendulum
from typing import List, Dict, Any, Optional
from airflow.utils.state import State
from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance

def last_execution_date(
    dag_id: str, task_id: str, n: int, session: Optional[Session] = None
) -> List[Pendulum]:
    """
    This function is to queries against airflow table and
    return the most recent execution date
    Args:
        dag_id: dag name
        task_id : task name
        n : number of runs
        session: Session to connect airflow postgres db
    Returns:
        list of execution date of most of recent n runs
    """
    query_val = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_id == dag_id,
            TaskInstance.task_id == task_id,
            TaskInstance.state == State.SUCCESS,
        )
        .order_by(TaskInstance.execution_date.desc())
        .limit(n)
    )
    execution_dates: List[Pendulum] = list(map(lambda ti: ti.execution_date, query_val))
    return execution_dates

# Above function can be used as utility function and can be leveraged with provide_session as below:
 
last_success_date_fn = provide_session(last_execution_date) # can use provide session decorator as is.
Run Code Online (Sandbox Code Playgroud)

该代码片段经过端到端测试,可以在产品中使用。

我已经提到了编写此脚本的tree()方法。views.py


或者,您可以向 Airflow 的元数据库触发此 SQL 查询,以检索成功运行的最后 n 个执行日期

SELECT execution_date
FROM task_instance
WHERE dag_id = 'my_dag_id'
  AND task_id = 'my_task_id'
  AND state = 'success'
ORDER BY execution_date DESC
LIMIT n
Run Code Online (Sandbox Code Playgroud)