Cyp*_*pho 4 python airflow snowflake-cloud-data-platform
正如您在代码示例中看到的,我试图通过 Airflow DAG 任务将 session_parameters 传递到我的 Snowflake 连接,但该参数没有被拾取,有什么解决方案吗?
task = SnowflakeOperator(
task_id='Task',
sql="CREATE OR REPLACE TABLE MY_DB.MY_SCHEMA.MY_TABLE (test VARCHAR)",
session_parameters={
"QUERY_TAG": "my_tag"
},
snowflake_conn_id="snowflake_connection",
warehouse="MY_WH",
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
您可能使用的是过时版本的雪花运算符。PRsession_parameters
中添加了支持
因为Airflow>=2.3.0
你需要拥有:
pip install apache-airflow-providers-common-sql
pip install apache-airflow-providers-snowflake
Run Code Online (Sandbox Code Playgroud)
然后你可以使用你的代码:
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
task = SQLExecuteQueryOperator(
task_id='Task',
conn_id="snowflake_connection",
sql="CREATE OR REPLACE TABLE MY_DB.MY_SCHEMA.MY_TABLE (test VARCHAR)",
hook_params={
"session_parameters": {"QUERY_TAG": "my_tag"},
"warehouse": "MY_WH"
},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
是SQLExecuteQueryOperator
通用运算符。它在底层用于SnowflakeHook
与 Snowflake 交互。
因为2.0.0<=Airflow<2.3.0
你需要拥有:
pip install apache-airflow-providers-snowflake
Run Code Online (Sandbox Code Playgroud)
然后你可以使用你的代码:
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
task = SnowflakeOperator(
task_id='Task',
sql="CREATE OR REPLACE TABLE MY_DB.MY_SCHEMA.MY_TABLE (test VARCHAR)",
session_parameters={
"QUERY_TAG": "my_tag"
},
snowflake_conn_id="snowflake_connection",
warehouse="MY_WH",
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
请注意,此方法SnowflakeOperator
已弃用,但也适用于较新版本的 Airflow,直到该操作员从 Snowflake 提供商中删除。
因为Airflow<2.0.0
你需要拥有:
pip install apache-airflow-backport-providers-snowflake>='2020.11.23'
Run Code Online (Sandbox Code Playgroud)