访问通过CLI传递给Airflow的配置参数

dev*_*evj 11 python airflow

我试图在触发dag运行时将以下配置参数传递给Airflow CLI.以下是我正在使用的trigger_dag命令.

airflow trigger_dag  -c '{"account_list":"[1,2,3,4,5]", "start_date":"2016-04-25"}'  insights_assembly_9900 
Run Code Online (Sandbox Code Playgroud)

我的问题是如何在dag运行中访问运算符内传递的con参数.

小智 14

如果您尝试访问 Airflow 系统范围的配置(而不是 DAG 配置),以下内容可能会有所帮助:

首先,导入这个

from airflow.configuration import conf
Run Code Online (Sandbox Code Playgroud)

其次,在某处获取值

conf.get("core", "my_key")
Run Code Online (Sandbox Code Playgroud)

可能的话,设置一个值

conf.set("core", "my_key", "my_val")
Run Code Online (Sandbox Code Playgroud)

  • 这是一个完全不同的conf,与DAG运行conf无关。 (13认同)
  • 我认为这个问题的区别不是那么清楚。这对我还是有帮助的!谢谢 (6认同)

Arn*_*was 13

这可能是所提供答案的延续devj

  1. airflow.cfg以下属性应设置为true: dag_run_conf_overrides_params=True

  2. 在定义PythonOperator时,传递以下参数provide_context=True。例如:

get_row_count_operator = PythonOperator(task_id ='get_row_count',python_callable = do_work,dag = dag,Provide_context = True)
  1. 定义可调用的python(注意的使用**kwargs):
def do_work(** kwargs):    
    table_name = kwargs ['dag_run']。conf.get('table_name')    
    #其余代码
  1. 从命令行调用dag:
气流trigger_dag read_hive --conf'{“ table_name”:“ my_table_name”}'

我发现讨论很有帮助。

  • 我收到错误 dag_run not fount on Airflow server (4认同)

dev*_*evj 8

有两种方法可以访问airflow trigger_dag命令中传递的参数.

  1. 在PythonOperator中定义的可调用方法中,可以访问params as kwargs['dag_run'].conf.get('account_list')

  2. 鉴于你正在使用这个东西的领域是一个可以使用的领域 {{ dag_run.conf['account_list'] }}

schedule_interval用于外部触发-DAG能够被设定为None用于上述方法的工作

  • 有没有办法从`with DAG()中访问`dag_run`作为dag:`block?我想根据是否存在`conf`键来解析`params`值到任务中,如果不存在,则取一个`default_arg`值(而不是在jinja模板中加入过多的逻辑). (7认同)
  • 也许 ```conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf``` (2认同)