气流通过 Snowflake session_parameters

Cyp*_*pho 4 python airflow snowflake-cloud-data-platform

正如您在代码示例中看到的,我试图通过 Airflow DAG 任务将 session_parameters 传递到我的 Snowflake 连接,但该参数没有被拾取,有什么解决方案吗?

task = SnowflakeOperator(
    task_id='Task',
    sql="CREATE OR REPLACE TABLE MY_DB.MY_SCHEMA.MY_TABLE (test VARCHAR)",
    session_parameters={
        "QUERY_TAG": "my_tag"
    },
    snowflake_conn_id="snowflake_connection",
    warehouse="MY_WH",
    dag=dag,
)
Run Code Online (Sandbox Code Playgroud)

Ela*_*lad 6

您可能使用的是过时版本的雪花运算符。PRsession_parameters 中添加了支持


因为Airflow>=2.3.0你需要拥有:

pip install apache-airflow-providers-common-sql
pip install apache-airflow-providers-snowflake
Run Code Online (Sandbox Code Playgroud)

然后你可以使用你的代码:

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

task = SQLExecuteQueryOperator(
    task_id='Task',
    conn_id="snowflake_connection",
    sql="CREATE OR REPLACE TABLE MY_DB.MY_SCHEMA.MY_TABLE (test VARCHAR)",
    hook_params={
        "session_parameters": {"QUERY_TAG": "my_tag"},
        "warehouse": "MY_WH"
    },
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

SQLExecuteQueryOperator通用运算符。它在底层用于SnowflakeHook与 Snowflake 交互。


因为2.0.0<=Airflow<2.3.0你需要拥有:

pip install apache-airflow-providers-snowflake
Run Code Online (Sandbox Code Playgroud)

然后你可以使用你的代码:

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
task = SnowflakeOperator(
    task_id='Task',
    sql="CREATE OR REPLACE TABLE MY_DB.MY_SCHEMA.MY_TABLE (test VARCHAR)",
    session_parameters={
        "QUERY_TAG": "my_tag"
    },
    snowflake_conn_id="snowflake_connection",
    warehouse="MY_WH",
    dag=dag,
)
Run Code Online (Sandbox Code Playgroud)

请注意,此方法SnowflakeOperator弃用,但也适用于较新版本的 Airflow,直到该操作员从 Snowflake 提供商中删除。


因为Airflow<2.0.0你需要拥有:

pip install apache-airflow-backport-providers-snowflake>='2020.11.23'
Run Code Online (Sandbox Code Playgroud)