小编Sha*_*pta的帖子

Airflow 中外部连接的连接池

我正在尝试寻找一种方法来管理 Airflow 中创建的外部连接的连接池。
Airflow 版本:2.1.0
Python 版本:3.9.5
Airflow DB:SQLite
创建的外部连接:MySQL 和 Snowflake

我知道airflow.cfg 文件中有属性

sql_alchemy_pool_enabled = True  
sql_alchemy_pool_size = 5
Run Code Online (Sandbox Code Playgroud)

但这些属性用于管理气流内部数据库,在我的例子中是 SQLite。

我有一些在 MySQL 和 Snowflake 中读取或写入数据的任务。

snowflake_insert = SnowflakeOperator(
          task_id='insert_snowflake',
          dag=dag,
          snowflake_conn_id=SNOWFLAKE_CONN_ID,
          sql="Some Insert query",
          warehouse=SNOWFLAKE_WAREHOUSE,
          database=SNOWFLAKE_DATABASE,
          schema=SNOWFLAKE_SCHEMA,
          role=SNOWFLAKE_ROLE
     )
Run Code Online (Sandbox Code Playgroud)

insert_mysql_task = MySqlOperator(task_id='insert_record', mysql_conn_id='mysql_default', sql="some insert query", dag=dag)
Run Code Online (Sandbox Code Playgroud)

从 MySQL 读取数据

def get_records():
    mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
    records = mysql_hook.get_records(sql=r"""Some select query""")
    print(records)
Run Code Online (Sandbox Code Playgroud)

我观察到的是,Snowflake 的每个任务(同一个 dag 中有多个任务)都会创建一个新会话,但尚未验证 MySQL 的情况是否相同。

有没有办法维护外部连接的连接池(在我的例子中是 Snowflake 和 MySQL)或任何其他方式来在同一会话中的同一 DAG 中运行所有查询?

谢谢

python mysql snowflake-cloud-data-platform airflow-scheduler airflow-2.x

5
推荐指数
1
解决办法
3091
查看次数