Airflow 如何设置 dag_run.conf 的默认值

Set*_*nig 23 airflow

我正在尝试设置一个 Airflow DAG,它提供可从dag_run.conf. 当使用“Run w/ Config”选项从 webUI 运行 DAG 时,这非常有效。但是,当按计划运行时,dag_run.conf字典不存在,并且任务将失败,例如

jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'
Run Code Online (Sandbox Code Playgroud)

下面是一个示例作业。

是否可以使其dag_run.conf始终包含此处定义的字典params

jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'
Run Code Online (Sandbox Code Playgroud)

我见过的最接近的是For Apache Airflow,如何通过 CLI 手动触发 DAG 时传递参数?,但是他们利用 Jinja 和 if/else - 这需要定义这些默认参数两次。我只想定义它们一次。

Nic*_*coE 23

您可以使用 DAG参数来实现您正在寻找的内容:

\n
\n

params (dict) \xe2\x80\x93 DAG 级别参数的字典,可在模板中访问,在 params 下命名。这些参数可以在任务级别被覆盖。

\n
\n

您可以params在 DAG 或任务级别进行定义,也可以从 UI 中的“触发 DAG w/配置”部分添加或修改它们。

\n

有向无环图示例:

\n
default_args = {\n    "owner": "airflow",\n}\n\ndag = DAG(\n    dag_id="example_dag_params",\n    default_args=default_args,\n    schedule_interval="*/5 * * * *",\n    start_date=days_ago(1),\n    params={"param1": "first_param"},\n    catchup=False,\n)\nwith dag:\n\n    bash_task = BashOperator(\n        task_id="bash_task", bash_command="echo bash_task: {{ params.param1 }}"\n    )\n
Run Code Online (Sandbox Code Playgroud)\n

输出日志

\n
[2021-10-02 20:23:25,808] {logging_mixin.py:104} INFO - Running <TaskInstance: example_dag_params.bash_task 2021-10-02T23:15:00+00:00 [running]> on host worker_01\n[2021-10-02 20:23:25,867] {taskinstance.py:1302} INFO - Exporting the following env vars:\nAIRFLOW_CTX_DAG_OWNER=***\nAIRFLOW_CTX_DAG_ID=example_dag_params\nAIRFLOW_CTX_TASK_ID=bash_task\nAIRFLOW_CTX_EXECUTION_DATE=2021-10-02T23:15:00+00:00\nAIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-10-02T23:15:00+00:00\n[2021-10-02 20:23:25,870] {subprocess.py:52} INFO - Tmp dir root location: \n /tmp\n[2021-10-02 20:23:25,871] {subprocess.py:63} INFO - Running command: [\'bash\', \'-c\', \'echo bash_task: first_param\']\n[2021-10-02 20:23:25,884] {subprocess.py:74} INFO - Output:\n[2021-10-02 20:23:25,886] {subprocess.py:78} INFO - bash_task: first_param\n[2021-10-02 20:23:25,887] {subprocess.py:82} INFO - Command exited with return code 0\n
Run Code Online (Sandbox Code Playgroud)\n

从日志中,请注意已dag_run计划并且参数仍然存在。

\n

您可以在此答案中找到有关使用参数的更广泛的示例。

\n

希望这对你有用!

\n

  • 这有效,谢谢!奇怪的是,触发器页面和文档建议使用“dag_run.conf”,恕我直言,这显然是一个更明智的模式 (5认同)