我使用Airflow来管理ETL任务的执行和计划。已创建DAG,并且工作正常。但是通过cli手动触发dag时可以传递参数。
例如:我的DAG每天在01:30运行,并处理昨天的数据(时间范围从昨天的01:30到今天的01:30)。数据源可能存在一些问题。我需要重新处理这些数据(手动指定时间范围)。
因此,我可以在预定的时候创建这样的气流DAG,使其默认时间范围为昨天的01:30到今天的01:30。然后,如果数据源有任何问题,我需要手动触发DAG并手动将时间范围作为参数传递。
据我所知airflow test有-tp可以通过PARAMS的任务。但这仅用于测试特定任务。并且airflow trigger_dag没有-tp选择。那么有没有办法将tigger_dag传递给DAG,然后操作员可以读取这些参数?
谢谢!
kax*_*xil 13
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run.conf["key"] }}" in templated field.
CLI:
airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '{"message":"value"}'
Run Code Online (Sandbox Code Playgroud)
DAG File:
args = {
'start_date': datetime.utcnow(),
'owner': 'airflow',
}
dag = DAG(
dag_id='example_dag_conf',
default_args=args,
schedule_interval=None,
)
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
# You can also access the DagRun object in templates
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: '
'{{ dag_run.conf["message"] if dag_run else "" }}" ',
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5643 次 |
| 最近记录: |