我认为这里prev_execution_date列出的宏可以让我获得上次 DAG 运行的执行日期,但查看源代码似乎只能根据 DAG 计划获得最后日期。
prev_execution_date = task.dag.previous_schedule(self.execution_date)
Run Code Online (Sandbox Code Playgroud)
当 DAG 未按计划运行时,是否有任何方法可以通过宏获取 DAG 的执行日期?
我希望在 Airflow 中创建一个转换,并且我想确保从上次运行 DAG 以更新我的目标表以来从我的源中获取所有数据。为此,我希望能够获得最近成功的执行。
我发现了这一点:Apache 气流宏获得最后一次 dag 运行执行时间,这使我到达最终目标的某个地方,但是,这只会获得 DAG 执行的最后一次,无论它是否成功。
SELECT col1, col2, col3
FROM schema.table
WHERE table.updated_at > '{{ last_dag_run_execution_date(dag) }}';
Run Code Online (Sandbox Code Playgroud)
如果执行失败(由于连接或类似原因),last_dag_run_execution_date(dag) 将更新,但我们错过了之前 DAG 运行的执行。
理想情况下,这将拉取最近的非失败执行。或者如果有人有任何想法我怎么能满足这个,请让我知道
我试过下面的代码,但我仍然遇到问题
from airflow.models DagModel
def get_latest_execution_date(**kwargs):
session = airflow.settings.Session()
f = open("/home/Insurance/InsuranceDagsTimestamp.txt","w+")
try:
Insurance_last_dag_run = session.query(DagModel)
for Insdgrun in Insurance_last_dag_run:
if Insdgrun is None:
f.write(Insdgrun.dag_id+",9999-12-31"+"\n")
else:
f.write(Insdgrun.dag_id+","+ Insdgrun.execution_date+"\n")
except:
session.rollback()
finally:
session.close()
t1 = PythonOperator(
task_id='records',
provide_context=True,
python_callable=get_latest_execution_date,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
有什么方法可以修复和获取最新的 dag 运行时信息
airflow ×3