访问任务内 Dag Run 的数据间隔

Ser*_*nry 6 python airflow

我正在使用 Apache Airflow 构建 ETL 管道。我必须提取添加到 SQL 数据库的最新数据(比如每天)。因此,我想构造一个查询,如下所示:

SELECT foo
FROM bar
WHERE insert_date >= "DATA_INTERVAL_START_HERE"
   AND insert_date < "DATA_INTERVAL_END_HERE"
Run Code Online (Sandbox Code Playgroud)

要在任务中执行此查询(例如使用 pyodbc),我需要访问提取任务内 Dag Run 对象的数据间隔开始和结束时间。我怎样才能检索这些信息?

Ser*_*nry 4

这里提出了类似的问题: Airflow ETL pipeline - using Schedule date in functions? 然而,自 Airflow 2.0 以来,答案并没有更新到 TaskFlow API。

访问数据间隔参数的简洁方法:

@dag(schedule_interval="@daily", start_date=datetime(2022, 2, 8), catchup=True)
def tutorial_access_data_interval():
     @task()
     def extract(data_interval_start=None, data_interval_end=None, **kwargs):
          #Use data_interval_start, data_interval_end here
Run Code Online (Sandbox Code Playgroud)

Airflow引擎将默认提供参数。

参考:

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#variables