我需要获取 Airflow 作业的最后两个成功执行日期以在当前运行中使用。示例:执行日期 作业状态 2020-05-03 成功 2020-05-04 失败 2020-05-05 成功
问题:当我在 5 月 6 日运行作业时,我应该将 5 月 3 日和 5 日的值放入变量中。是否可以?
您可以利用SQLAlchemy魔法来检索execution_date最后“n”次成功运行的 s
from pendulum import Pendulum
from typing import List, Dict, Any, Optional
from airflow.utils.state import State
from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance
def last_execution_date(
dag_id: str, task_id: str, n: int, session: Optional[Session] = None
) -> List[Pendulum]:
"""
This function is to queries against airflow table and
return the most recent execution date
Args:
dag_id: dag name
task_id : task name
n : number of runs
session: Session to connect airflow postgres db
Returns:
list of execution date of most of recent n runs
"""
query_val = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag_id,
TaskInstance.task_id == task_id,
TaskInstance.state == State.SUCCESS,
)
.order_by(TaskInstance.execution_date.desc())
.limit(n)
)
execution_dates: List[Pendulum] = list(map(lambda ti: ti.execution_date, query_val))
return execution_dates
# Above function can be used as utility function and can be leveraged with provide_session as below:
last_success_date_fn = provide_session(last_execution_date) # can use provide session decorator as is.
Run Code Online (Sandbox Code Playgroud)
该代码片段经过端到端测试,可以在产品中使用。
或者,您可以向 Airflow 的元数据库触发此 SQL 查询,以检索成功运行的最后 n 个执行日期
SELECT execution_date
FROM task_instance
WHERE dag_id = 'my_dag_id'
AND task_id = 'my_task_id'
AND state = 'success'
ORDER BY execution_date DESC
LIMIT n
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2913 次 |
| 最近记录: |