我需要帮助将参数(xcom 从上一个任务推送)传递到 .sql 文件中的 SQL 查询。但是,我无法使用“参数”选项来执行此操作,即使此选项能够渲染上一个任务中的 xcom 值。让我知道我做错了什么。
谢谢 :)
start = EmptyOperator(
task_id="start",
)
fetch_cust_id = PythonOperator(
task_id = "fetch",
python_callable = lambda: 'C001',
)
update_orders = MySqlOperator(
task_id="update",
mysql_conn_id="mysql_default",
database="my_db",
sql="/update.sql",
parameters={
"custid": "{{ ti.xcom_pull(task_ids='fetch') }}"
}
)
start >> fetch_cust_id >> update_orders
Run Code Online (Sandbox Code Playgroud)
SQL文件(update.sql):
UPDATE orders
SET placed = 'yes'
WHERE
custid = {{ custid }}
;
Run Code Online (Sandbox Code Playgroud)
:(
用于parameters将“变量”传递给 SqlAlchemy 引擎。在这种情况下,渲染不是在 Airflow 引擎中完成的。如果你想使用它,你需要使用 SqlAlchemy 语法。例子:
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
Run Code Online (Sandbox Code Playgroud)
parameters但在你的情况下,你想要模板化 xcom,所以根本没有理由使用。您希望渲染由 Airflow 完成。
您可以直接在 sql 中设置它,sql因为 sql 是模板字段:
UPDATE orders
SET placed = 'yes'
WHERE custid = "{{ ti.xcom_pull(task_ids='fetch') }}";
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1402 次 |
| 最近记录: |