Raf*_*bir 1 airflow airflow-scheduler
我有一个 DAG,我想用它来回填我的数据库表。
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 4, 1),
'retry_delay': timedelta(minutes=1),
}
dag = DAG(dag_id='airflow_backfill', default_args=args, schedule_interval='@daily')
"""
Task for inserting data per day
"""
task1 = PostgresOperator(
task_id='insert_new_row',
postgres_conn_id='aws_pg',
sql="INSERT INTO airflow_test(date_at) VALUES('2018-04-01')",
dag=dag,
)
task2 = PostgresOperator(
task_id='update_team_name',
postgres_conn_id='aws_pg',
sql="UPDATE airflow_test SET team_name = (SELECT team_name FROM teams ORDER BY RANDOM() LIMIT 1) WHERE team_name is NULL",
dag=dag,
)
task1.set_downstream(task2)
Run Code Online (Sandbox Code Playgroud)
我从 2018 年 4 月 1 日开始在数据库中插入一行,但问题是我对 date_at 变量进行了硬编码。
我的问题是,有什么方法可以将回填日期作为插入值吗?我想在回填时自动设置“date_at”的值,但没有找到任何可以自动获取回填日期的气流环境/配置变量。
我正在使用 Apache Airflow 1.9.0。谢谢。
编辑:您应该能够使用 jinja 模板来获取变量execution_date:
task1 = PostgresOperator(
task_id='insert_new_row',
postgres_conn_id='aws_pg',
sql="INSERT INTO airflow_test(date_at) VALUES('{{ ds }}')",
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
https://airflow.apache.org/code.html#default-variables