如何获得在气流中运行的 dag 的最新执行时间

Rav*_*avi 4 airflow

我试过下面的代码,但我仍然遇到问题

from airflow.models DagModel

def get_latest_execution_date(**kwargs):

session = airflow.settings.Session()

f = open("/home/Insurance/InsuranceDagsTimestamp.txt","w+")

try:
    Insurance_last_dag_run = session.query(DagModel)
    for Insdgrun in Insurance_last_dag_run:
        if Insdgrun is None: 
            f.write(Insdgrun.dag_id+",9999-12-31"+"\n")
        else:
            f.write(Insdgrun.dag_id+","+ Insdgrun.execution_date+"\n")
except:
    session.rollback()
finally:
    session.close()

t1 = PythonOperator(
    task_id='records',
    provide_context=True,
    python_callable=get_latest_execution_date,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

有什么方法可以修复和获取最新的 dag 运行时信息

Jos*_*osh 6

有多种方法可以获取 DagRun 的最新执行情况。一种方法是利用 Airflow DagRun 模型。

from airflow.models import DagRun

def get_most_recent_dag_run(dag_id):
    dag_runs = DagRun.find(dag_id=dag_id)
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    return dag_runs[0] if dag_runs else None


dag_run = get_most_recent_dag_run('fake-dag-id-001')
if dag_run:
    print(f'The most recent DagRun was executed at: {dag_run.execution_date}')
Run Code Online (Sandbox Code Playgroud)

您可以在位于此处Airflow Docs 中找到有关 DagRun 模型及其属性的更多信息。


Olu*_*ule 1

PythonOperator op_args参数已模板化。

该可调用函数仅将最新执行日期写入文件,因此您可以通过以下方式实现该函数:

def store_last_execution_date(execution_date):
    '''Appends latest execution date to a file
    :param execution_date: The last execution date of the DagRun.
    '''

    with open("/home/Insurance/InsuranceDagsTimestamp.txt", "w+") as f:
        f.write(execution_date)


t1 = PythonOperator(
         task_id="records",
         provide_context=True,
         python_callable=store_last_execution_date,
         op_args=[
             "{{dag.get_latest_execution_date()}}",
         ],
         dag=dag
     )
Run Code Online (Sandbox Code Playgroud)