我正在尝试读取包含 Airflow 中自定义运算符中的 jinja 模板查询的 sql 文件。我已经使用 PythonOperator 实现了它,它调用我使用过的函数
def execute_query(**kwargs)
sql_query = open('my_sql_query.sql').read() #(SELECT * FROM my_table WHERE date > {})
sql_query.format(kwargs['ds'])
Run Code Online (Sandbox Code Playgroud)
但我更喜欢直接在查询中
使用这种语法{{ ds }}SELECT * FROM my_table WHERE date > {{ ds }}
我做了什么:
class SQLOperator(BaseOperator):
template_fields = ['sql']
template_ext = ('.sql',)
@apply_defaults
def __init__(
self,
name = None,
sql = None,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
self.sql = sql
def execute(self, context):
print("Name", name) …Run Code Online (Sandbox Code Playgroud)