我认为这里prev_execution_date列出的宏可以让我获得上次 DAG 运行的执行日期,但查看源代码似乎只能根据 DAG 计划获得最后日期。
prev_execution_date = task.dag.previous_schedule(self.execution_date)
Run Code Online (Sandbox Code Playgroud)
当 DAG 未按计划运行时,是否有任何方法可以通过宏获取 DAG 的执行日期?
Cha*_*man 10
是的,您可以为此定义自己的自定义宏,如下所示:
# custom macro function
def get_last_dag_run(dag):
last_dag_run = dag.get_last_dagrun()
if last_dag_run is None:
return "no prev run"
else:
return last_dag_run.execution_date.strftime("%Y-%m-%d")
# add macro in user_defined_macros in dag definition
dag = DAG(dag_id="my_test_dag",
schedule_interval='@daily',
user_defined_macros={
'last_dag_run_execution_date': get_last_dag_run
}
)
# example of using it in practice
print_vals = BashOperator(
task_id='print_vals',
bash_command='echo {{ last_dag_run_execution_date(dag) }}',
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
请注意, dag.get_last_run() 只是 Dag 对象上可用的众多函数之一。这是我找到它的地方:https : //github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/models.py#L3396
您还可以调整日期格式的字符串格式,以及如果之前没有运行过您想要输出的内容。
| 归档时间: |
|
| 查看次数: |
10685 次 |
| 最近记录: |