小编Pau*_*ung的帖子

如何使用 jinja 模板通过任务装饰器访问 Airflow 变量?

我当前正在访问气流变量,如下所示:

from airflow.models import Variable

s3_bucket = Variable.get('bucket_name')
Run Code Online (Sandbox Code Playgroud)

它有效,但我被要求不要使用变量模块并使用 jinja 模板代替(即):

s3_bucket = '{{ var.value.bucket_name }}'
Run Code Online (Sandbox Code Playgroud)

问题是当我在气流模板(例如 PythonOperator/BashOperator)中使用 jinja 时,它可以工作,但我无法让它以任务流 API 形式工作。该变量被读取为字符串文字。例子:

# Pretend DAG defined here

@task
def example_task():
    s3_bucket = '{{ var.value.bucket_name }}'
    print(s3_bucket)

example_task()
Run Code Online (Sandbox Code Playgroud)

上面的代码将打印“{{ var.value.bucket_name }}”而不是bucket_name值。

airflow airflow-taskflow

6
推荐指数
1
解决办法
8092
查看次数

标签 统计

airflow ×1

airflow-taskflow ×1