如何在airflow中记录sql执行结果?

Jon*_*han 5 psycopg2 airflow

我使用airflow python 运算符对 redshift/postgres 数据库执行 sql 查询。为了进行调试,我希望 DAG 返回 sql 执行的结果,类似于在控制台本地执行时看到的结果:

我用来psycop2创建连接/游标并执行 sql。记录下来对于确认解析的参数化 SQL 以及确认数据确实已插入非常有帮助(我曾经痛苦地经历过环境差异导致意外行为的问题)

我对气流或 python DBAPI 的低级工作原理没有深入的了解,但文档pscyopg2似乎确实提到了一些可能允许这样做的方法和连接配置。

我发现非常令人困惑的是,这很难做到,因为我想象这将是在此平台上运行 ETL 的主要用例。我听说过简单地创建额外的任务来查询表之前和之后的建议,但这似乎笨拙且无效。

谁能解释一下这如何可能,如果不可能,请解释为什么?欢迎实现类似结果的替代方法。谢谢!

到目前为止我已经尝试过该connection.status_message()方法,但它似乎只返回sql的第一行而不是结果。我还尝试创建一个日志游标,它生成 sql,但不生成控制台结果

import logging
import psycopg2 as pg
from psycopg2.extras import LoggingConnection

conn = pg.connect(
    connection_factory=LoggingConnection,
    ...
)
conn.autocommit = True

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler(sys.stdout))
conn.initialize(logger)

cur = conn.cursor()

sql = """    
    INSERT INTO mytable (
    SELECT *
    FROM other_table
    );
"""

cur.execute(sql)
Run Code Online (Sandbox Code Playgroud)

我希望记录器返回类似以下内容:

sql> INSERT INTO mytable (
     SELECT ...
[2019-07-25 23:00:54] 912 rows affected in 4 s 442 ms
Run Code Online (Sandbox Code Playgroud)

cho*_*rbs 1

假设您正在编写一个使用 postgres hook 在 sql 中执行某些操作的运算符。

操作员内部打印的任何内容都会被记录。

因此,如果您想记录该语句,只需在运算符中打印该语句即可。

print(sql)
Run Code Online (Sandbox Code Playgroud)

如果您想记录结果,请获取结果并打印结果。例如

result = cur.fetchall()
for row in result:
    print(row)
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用self.log.info代替 print,其中 self 指的是运算符实例。