如何在 Airflow 中从文件执行 SQL 查询?(PostgresSQL 运算符)

Jav*_*más 4 python postgresql jinja2 airflow

我正在使用 PostgresSQL 运算符。任务如下:

emailage_transformations = PostgresOperator(
    task_id = 'emailage_transformations',
    sql = '/home/ubuntu/airflow_ci/current/scripts/antifraud/emailage_transformations.sql',
    postgres_conn_id = 'redshift',
    autocommit = True,
    dag = dag)
Run Code Online (Sandbox Code Playgroud)

首先,文件的内容如下:

select cd_pedido_nr,fraud_score,risk_band,ip_risk_level
into antifraud.stg_emailage_id_pedido
from antifraud.stg_emailage_id_email e
left join antifraud.info_emails i on id_email = cd_email_nr
;
Run Code Online (Sandbox Code Playgroud)

我得到的错误是

jinja2.exceptions.TemplateNotFound: /home/ubuntu/airflow_ci/current/scripts/antifraud/emailage_transformations.sql
Run Code Online (Sandbox Code Playgroud)

因此,我在查询中添加了几个括号以符合 jinja2 模板,现在文件代码为:

{select cd_pedido_nr,fraud_score,risk_band,ip_risk_level
into antifraud.stg_emailage_id_pedido
from antifraud.stg_emailage_id_email e
left join antifraud.info_emails i on id_email = cd_email_nr
;}
Run Code Online (Sandbox Code Playgroud)

但是,我仍然有同样的错误。我该如何解决呢?

y2k*_*ham 7

我认为,正如以下链接所述,您应该template_searchpath向您的 DAG 提供一个,以便它能够选择您的外部文件(SQL 或其他文件)

或者,使外部文件可被发现,例如通过修改AIRFLOW_HOME或通过其他技巧也可以工作