我试过下面的代码,但我仍然遇到问题
from airflow.models DagModel
def get_latest_execution_date(**kwargs):
session = airflow.settings.Session()
f = open("/home/Insurance/InsuranceDagsTimestamp.txt","w+")
try:
Insurance_last_dag_run = session.query(DagModel)
for Insdgrun in Insurance_last_dag_run:
if Insdgrun is None:
f.write(Insdgrun.dag_id+",9999-12-31"+"\n")
else:
f.write(Insdgrun.dag_id+","+ Insdgrun.execution_date+"\n")
except:
session.rollback()
finally:
session.close()
t1 = PythonOperator(
task_id='records',
provide_context=True,
python_callable=get_latest_execution_date,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
有什么方法可以修复和获取最新的 dag 运行时信息
有多种方法可以获取 DagRun 的最新执行情况。一种方法是利用 Airflow DagRun 模型。
from airflow.models import DagRun
def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[0] if dag_runs else None
dag_run = get_most_recent_dag_run('fake-dag-id-001')
if dag_run:
print(f'The most recent DagRun was executed at: {dag_run.execution_date}')
Run Code Online (Sandbox Code Playgroud)
您可以在位于此处的Airflow Docs 中找到有关 DagRun 模型及其属性的更多信息。
该PythonOperator op_args参数已模板化。
该可调用函数仅将最新执行日期写入文件,因此您可以通过以下方式实现该函数:
def store_last_execution_date(execution_date):
'''Appends latest execution date to a file
:param execution_date: The last execution date of the DagRun.
'''
with open("/home/Insurance/InsuranceDagsTimestamp.txt", "w+") as f:
f.write(execution_date)
t1 = PythonOperator(
task_id="records",
provide_context=True,
python_callable=store_last_execution_date,
op_args=[
"{{dag.get_latest_execution_date()}}",
],
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3899 次 |
| 最近记录: |