小编Ani*_*nia的帖子

在 Airflow 中的自定义运算符中使用 jinja 模板读取 sql 文件

我正在尝试读取包含 Airflow 中自定义运算符中的 jinja 模板查询的 sql 文件。我已经使用 PythonOperator 实现了它,它调用我使用过的函数

def execute_query(**kwargs)
    sql_query = open('my_sql_query.sql').read()   #(SELECT * FROM my_table WHERE date > {})
    sql_query.format(kwargs['ds'])
Run Code Online (Sandbox Code Playgroud)

但我更喜欢直接在查询中 使用这种语法{{ ds }}SELECT * FROM my_table WHERE date > {{ ds }}

我做了什么:

  1. 我使用 template_fields 和 template_ext 创建了 CustomOperator
class SQLOperator(BaseOperator):

   template_fields = ['sql']
   template_ext = ('.sql',)

   @apply_defaults
   def __init__(
       self,
       name = None,
       sql = None,
       *args, **kwargs) -> None:
       super().__init__(*args, **kwargs)
       self.name = name
       self.sql = sql

    def execute(self, context):
       print("Name", name) …
Run Code Online (Sandbox Code Playgroud)

sql jinja2 directed-acyclic-graphs airflow

7
推荐指数
1
解决办法
6913
查看次数

标签 统计

airflow ×1

directed-acyclic-graphs ×1

jinja2 ×1

sql ×1