我当前正在访问气流变量,如下所示:
from airflow.models import Variable
s3_bucket = Variable.get('bucket_name')
Run Code Online (Sandbox Code Playgroud)
它有效,但我被要求不要使用变量模块并使用 jinja 模板代替(即):
s3_bucket = '{{ var.value.bucket_name }}'
Run Code Online (Sandbox Code Playgroud)
问题是当我在气流模板(例如 PythonOperator/BashOperator)中使用 jinja 时,它可以工作,但我无法让它以任务流 API 形式工作。该变量被读取为字符串文字。例子:
# Pretend DAG defined here
@task
def example_task():
s3_bucket = '{{ var.value.bucket_name }}'
print(s3_bucket)
example_task()
Run Code Online (Sandbox Code Playgroud)
上面的代码将打印“{{ var.value.bucket_name }}”而不是bucket_name值。