我正在使用 Apache Airflow 构建 ETL 管道。我必须提取添加到 SQL 数据库的最新数据(比如每天)。因此,我想构造一个查询,如下所示:
SELECT foo
FROM bar
WHERE insert_date >= "DATA_INTERVAL_START_HERE"
AND insert_date < "DATA_INTERVAL_END_HERE"
Run Code Online (Sandbox Code Playgroud)
要在任务中执行此查询(例如使用 pyodbc),我需要访问提取任务内 Dag Run 对象的数据间隔开始和结束时间。我怎样才能检索这些信息?
这里提出了类似的问题: Airflow ETL pipeline - using Schedule date in functions? 然而,自 Airflow 2.0 以来,答案并没有更新到 TaskFlow API。
访问数据间隔参数的简洁方法:
@dag(schedule_interval="@daily", start_date=datetime(2022, 2, 8), catchup=True)
def tutorial_access_data_interval():
@task()
def extract(data_interval_start=None, data_interval_end=None, **kwargs):
#Use data_interval_start, data_interval_end here
Run Code Online (Sandbox Code Playgroud)
Airflow引擎将默认提供参数。
参考:
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#variables