我试图在触发dag运行时将以下配置参数传递给Airflow CLI.以下是我正在使用的trigger_dag命令.
airflow trigger_dag -c '{"account_list":"[1,2,3,4,5]", "start_date":"2016-04-25"}' insights_assembly_9900
Run Code Online (Sandbox Code Playgroud)
我的问题是如何在dag运行中访问运算符内传递的con参数.
小智 14
如果您尝试访问 Airflow 系统范围的配置(而不是 DAG 配置),以下内容可能会有所帮助:
首先,导入这个
from airflow.configuration import conf
Run Code Online (Sandbox Code Playgroud)
其次,在某处获取值
conf.get("core", "my_key")
Run Code Online (Sandbox Code Playgroud)
可能的话,设置一个值
conf.set("core", "my_key", "my_val")
Run Code Online (Sandbox Code Playgroud)
Arn*_*was 13
这可能是所提供答案的延续devj。
在airflow.cfg以下属性应设置为true:
dag_run_conf_overrides_params=True
在定义PythonOperator时,传递以下参数provide_context=True。例如:
get_row_count_operator = PythonOperator(task_id ='get_row_count',python_callable = do_work,dag = dag,Provide_context = True)
**kwargs):def do_work(** kwargs):
table_name = kwargs ['dag_run']。conf.get('table_name')
#其余代码
气流trigger_dag read_hive --conf'{“ table_name”:“ my_table_name”}'
我发现此讨论很有帮助。
有两种方法可以访问airflow trigger_dag命令中传递的参数.
在PythonOperator中定义的可调用方法中,可以访问params as kwargs['dag_run'].conf.get('account_list')
鉴于你正在使用这个东西的领域是一个可以使用的领域 {{ dag_run.conf['account_list'] }}
该schedule_interval用于外部触发-DAG能够被设定为None用于上述方法的工作
| 归档时间: |
|
| 查看次数: |
6334 次 |
| 最近记录: |