Pau*_*ung 6 airflow airflow-taskflow
我当前正在访问气流变量,如下所示:
from airflow.models import Variable
s3_bucket = Variable.get('bucket_name')
Run Code Online (Sandbox Code Playgroud)
它有效,但我被要求不要使用变量模块并使用 jinja 模板代替(即):
s3_bucket = '{{ var.value.bucket_name }}'
Run Code Online (Sandbox Code Playgroud)
问题是当我在气流模板(例如 PythonOperator/BashOperator)中使用 jinja 时,它可以工作,但我无法让它以任务流 API 形式工作。该变量被读取为字符串文字。例子:
# Pretend DAG defined here
@task
def example_task():
s3_bucket = '{{ var.value.bucket_name }}'
print(s3_bucket)
example_task()
Run Code Online (Sandbox Code Playgroud)
上面的代码将打印“{{ var.value.bucket_name }}”而不是bucket_name值。
它可以工作,但我被要求不要使用变量模块并使用 jinja 模板代替
这不是准确的建议,我将解释原因。
这样做绝对没有问题:
@task
def example_task():
s3_bucket = Variable.get('bucket_name')
print(s3_bucket)
example_task()
Run Code Online (Sandbox Code Playgroud)
Variable.get()您应该避免在顶级代码中使用。在调用的 python 可调用中使用它PythonOperator是完全安全的。
Airflow 不断解析您的.py文件以搜索 DAG 中的更改。这也意味着您作为顶级编写的任何代码都会在解析过程运行时执行。由于解析每 30 秒执行一次(默认值min_file_process_interval),这会给您的后端元存储带来压力。现在考虑一下,您的实例正在随着越来越多的 DAG 使用相同的方法而增长 - 您最终可能会由于容量过大而无法访问数据库。您实际上是在“攻击”您自己的数据库。这导致建议使用宏,就像使用宏一样,您永远不会面临对数据库造成压力的风险,因为宏仅在运行时进行评估。但这并不意味着您应该避免使用Variable.get()在有用时避免使用它。如果您没有正确使用宏,您将收到语法错误(就像您所经历的那样)。
澄清一下 - 可以Variable.get()在任何非顶级代码的代码部分中使用。
| 归档时间: |
|
| 查看次数: |
8092 次 |
| 最近记录: |