用于获取上次 dag 运行执行时间的 Apache 气流宏

And*_*idy 8 airflow

我认为这里prev_execution_date列出的宏可以让我获得上次 DAG 运行的执行日期,但查看源代码似乎只能根据 DAG 计划获得最后日期。

prev_execution_date = task.dag.previous_schedule(self.execution_date)
Run Code Online (Sandbox Code Playgroud)

当 DAG 未按计划运行时,是否有任何方法可以通过宏获取 DAG 的执行日期?

Cha*_*man 10

是的,您可以为此定义自己的自定义宏,如下所示:

# custom macro function
def get_last_dag_run(dag):
    last_dag_run = dag.get_last_dagrun()
    if last_dag_run is None:
        return "no prev run"
    else:
        return last_dag_run.execution_date.strftime("%Y-%m-%d")

# add macro in user_defined_macros in dag definition
dag = DAG(dag_id="my_test_dag",
      schedule_interval='@daily',
      user_defined_macros={
          'last_dag_run_execution_date': get_last_dag_run
      }
)

# example of using it in practice
print_vals = BashOperator(
    task_id='print_vals',
    bash_command='echo {{ last_dag_run_execution_date(dag) }}',
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

请注意, dag.get_last_run() 只是 Dag 对象上可用的众多函数之一。这是我找到它的地方:https : //github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/models.py#L3396

您还可以调整日期格式的字符串格式,以及如果之前没有运行过您想要输出的内容。

  • dag.get_last_dagrun(include_externally_triggered=True) 用于外部触发 d​​ags (2认同)