如何在 Airflow 中的 MySqlOperator 中渲染带参数的 .sql 文件?

sar*_*mar 1 mysql airflow

我需要帮助将参数(xcom 从上一个任务推送)传递到 .sql 文件中的 SQL 查询。但是,我无法使用“参数”选项来执行此操作,即使此选项能够渲染上一个任务中的 xcom 值。让我知道我做错了什么。

谢谢 :)

start = EmptyOperator(
            task_id="start",
    )

fetch_cust_id = PythonOperator(
    task_id = "fetch",
    python_callable = lambda: 'C001',
)

update_orders = MySqlOperator(
    task_id="update",
    mysql_conn_id="mysql_default",
    database="my_db",
    sql="/update.sql",
    parameters={
        "custid": "{{ ti.xcom_pull(task_ids='fetch') }}"
    }
)

start >> fetch_cust_id >> update_orders
Run Code Online (Sandbox Code Playgroud)

SQL文件(update.sql):

UPDATE orders
SET placed = 'yes'
WHERE
custid = {{ custid }}
;
Run Code Online (Sandbox Code Playgroud)

:(

在此输入图像描述

Ela*_*lad 5

用于parameters将“变量”传递给 SqlAlchemy 引擎。在这种情况下,渲染不是在 Airflow 引擎中完成的。如果你想使用它,你需要使用 SqlAlchemy 语法。例子:

sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
Run Code Online (Sandbox Code Playgroud)

parameters但在你的情况下,你想要模板化 xcom,所以根本没有理由使用。您希望渲染由 Airflow 完成。

您可以直接在 sql 中设置它,sql因为 sql 是模板字段:

UPDATE orders
SET placed = 'yes'
WHERE custid = "{{ ti.xcom_pull(task_ids='fetch') }}";
Run Code Online (Sandbox Code Playgroud)