小编Dan*_*Dan的帖子

Airflow + pandas read_sql_query() 和提交

问题

我可以使用 read_sql() 将 SQL 事务提交到数据库吗?

用例和背景

我有一个用例,我希望允许用户执行一些预定义的 SQL 并返回 pandas 数据帧。在某些情况下,此 SQL 需要查询预先填充的表,而在其他情况下,此 SQL 将执行一个函数,该函数将写入表,然后查询该表。此逻辑当前包含在 Airflow DAG 的方法内部,以便利用 PostgresHook 可访问 Airflow 的数据库连接信息 - 该方法最终在 PythonOperator 任务中调用。通过测试,我了解到 PostgresHook 创建了一个 psycopg2 连接对象。

代码

from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd 

def create_df(job_id,other_unrelated_inputs):
    conn = job_type_to_connection(job_type) # method that helps choose a database
    sql = open('/sql_files/job_id_{}.sql'.format(job_id)) #chooses arbitrary SQL  
    sql_template = sql.read() 
    hook = PostgresHook(postgres_conn_id=conn) #connection information for alias is predefined elsewhere within Airflow


    try:
        hook_conn_obj = hook.get_conn()
        print(type(hook_conn_obj)) # <class …
Run Code Online (Sandbox Code Playgroud)

sql commit pandas postgresql-8.2 airflow

5
推荐指数
1
解决办法
8053
查看次数

标签 统计

airflow ×1

commit ×1

pandas ×1

postgresql-8.2 ×1

sql ×1