在 Airflow 中的自定义运算符中使用 jinja 模板读取 sql 文件

Ani*_*nia 7 sql jinja2 directed-acyclic-graphs airflow

我正在尝试读取包含 Airflow 中自定义运算符中的 jinja 模板查询的 sql 文件。我已经使用 PythonOperator 实现了它,它调用我使用过的函数

def execute_query(**kwargs)
    sql_query = open('my_sql_query.sql').read()   #(SELECT * FROM my_table WHERE date > {})
    sql_query.format(kwargs['ds'])
Run Code Online (Sandbox Code Playgroud)

但我更喜欢直接在查询中 使用这种语法{{ ds }}SELECT * FROM my_table WHERE date > {{ ds }}

我做了什么:

  1. 我使用 template_fields 和 template_ext 创建了 CustomOperator
class SQLOperator(BaseOperator):

   template_fields = ['sql']
   template_ext = ('.sql',)

   @apply_defaults
   def __init__(
       self,
       name = None,
       sql = None,
       *args, **kwargs) -> None:
       super().__init__(*args, **kwargs)
       self.name = name
       self.sql = sql

    def execute(self, context):
       print("Name", name) # <- works
       print("Query", sql) # <- doesn't work and I don't know how to get the sql file content

Run Code Online (Sandbox Code Playgroud)
  1. 达格
default_args = {...}

dag = DAG(
    'sql_operator_test',
     schedule_interval='0 0 * * *',
     template_searchpath=['/Users/username/airflow/dags/sql/test/'],
     default_args=default_args)

sql_task = SQLOperator(
    task_id='sql_process',
    name="Aaa",
    sql="/Users/username/airflow/dags/sql/test.sql",
    dag=dag)
Run Code Online (Sandbox Code Playgroud)
  1. SQL查询
SELECT * FROM my_table WHERE date > {{ ds }}
Run Code Online (Sandbox Code Playgroud)

我已经没有主意了。是否有任何选项可以将文件传递给操作员或获取其渲染的内容?

Ela*_*lad 3

你的做法没问题。我采用了您的代码并创建了一个工作示例,显示(在{{ ds }}中构建)正在根据需要进行模板化。

创建一个.py文件如下:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class SQLOperator(BaseOperator):

   template_fields = ['sql']
   template_ext = ('.sql',)

   @apply_defaults
   def __init__(
           self,
           name = None,
           sql = None,
           *args,
           **kwargs
   ) -> None:

       super().__init__(**kwargs)
       self.name = name
       self.sql = sql

   def execute(self, context):
       print("Name", self.name)  # <- works
       print("Query", self.sql)  # <- Also works :)

default_args = {
    'owner': 'a',
    'start_date': datetime(2020, 3, 24, 2, 0, 0),
}


dag = DAG(
    'sql_operator_test',
     schedule_interval=None,
     default_args=default_args)

sql_task = SQLOperator(
    task_id='sql_process',
    name="Aaa",
    sql="test.sql",
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

文件test.sql为:

SELECT * FROM my_table WHERE date > {{ ds }}
Run Code Online (Sandbox Code Playgroud)

运行它给出:

在此输入图像描述

并从任务日志中:

在此输入图像描述