如何使用 jinja 模板通过任务装饰器访问 Airflow 变量?

Pau*_*ung 6 airflow airflow-taskflow

我当前正在访问气流变量,如下所示:

from airflow.models import Variable

s3_bucket = Variable.get('bucket_name')
Run Code Online (Sandbox Code Playgroud)

它有效,但我被要求不要使用变量模块并使用 jinja 模板代替(即):

s3_bucket = '{{ var.value.bucket_name }}'
Run Code Online (Sandbox Code Playgroud)

问题是当我在气流模板(例如 PythonOperator/BashOperator)中使用 jinja 时,它可以工作,但我无法让它以任务流 API 形式工作。该变量被读取为字符串文字。例子:

# Pretend DAG defined here

@task
def example_task():
    s3_bucket = '{{ var.value.bucket_name }}'
    print(s3_bucket)

example_task()
Run Code Online (Sandbox Code Playgroud)

上面的代码将打印“{{ var.value.bucket_name }}”而不是bucket_name值。

Ela*_*lad 7

它可以工作,但我被要求不要使用变量模块并使用 jinja 模板代替

这不是准确的建议,我将解释原因。

这样做绝对没有问题:

@task
def example_task():
    s3_bucket = Variable.get('bucket_name')
    print(s3_bucket)

example_task()
Run Code Online (Sandbox Code Playgroud)

Variable.get()您应该避免在顶级代码中使用。在调用的 python 可调用中使用它PythonOperator是完全安全的。

Airflow 不断解析您的.py文件以搜索 DAG 中的更改。这也意味着您作为顶级编写的任何代码都会在解析过程运行时执行。由于解析每 30 秒执行一次(默认值min_file_process_interval),这会给您的后端元存储带来压力。现在考虑一下,您的实例正在随着越来越多的 DAG 使用相同的方法而增长 - 您最终可能会由于容量过大而无法访问数据库。您实际上是在“攻击”您自己的数据库。这导致建议使用宏,就像使用宏一样,您永远不会面临对数据库造成压力的风险,因为宏仅在运行时进行评估。但这并不意味着您应该避免使用Variable.get()在有用时避免使用它。如果您没有正确使用宏,您将收到语法错误(就像您所经历的那样)。

澄清一下 - 可以Variable.get()在任何非顶级代码的代码部分中使用。