我使用airflow python 运算符对 redshift/postgres 数据库执行 sql 查询。为了进行调试,我希望 DAG 返回 sql 执行的结果,类似于在控制台本地执行时看到的结果:
我用来psycop2创建连接/游标并执行 sql。记录下来对于确认解析的参数化 SQL 以及确认数据确实已插入非常有帮助(我曾经痛苦地经历过环境差异导致意外行为的问题)
我对气流或 python DBAPI 的低级工作原理没有深入的了解,但文档pscyopg2似乎确实提到了一些可能允许这样做的方法和连接配置。
我发现非常令人困惑的是,这很难做到,因为我想象这将是在此平台上运行 ETL 的主要用例。我听说过简单地创建额外的任务来查询表之前和之后的建议,但这似乎笨拙且无效。
谁能解释一下这如何可能,如果不可能,请解释为什么?欢迎实现类似结果的替代方法。谢谢!
到目前为止我已经尝试过该connection.status_message()方法,但它似乎只返回sql的第一行而不是结果。我还尝试创建一个日志游标,它生成 sql,但不生成控制台结果
import logging
import psycopg2 as pg
from psycopg2.extras import LoggingConnection
conn = pg.connect(
connection_factory=LoggingConnection,
...
)
conn.autocommit = True
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler(sys.stdout))
conn.initialize(logger)
cur = conn.cursor()
sql = """
INSERT INTO mytable (
SELECT *
FROM other_table
);
"""
cur.execute(sql)
Run Code Online (Sandbox Code Playgroud)
我希望记录器返回类似以下内容:
sql> INSERT INTO mytable (
SELECT ...
[2019-07-25 23:00:54] 912 rows affected in 4 s 442 ms
Run Code Online (Sandbox Code Playgroud)
假设您正在编写一个使用 postgres hook 在 sql 中执行某些操作的运算符。
操作员内部打印的任何内容都会被记录。
因此,如果您想记录该语句,只需在运算符中打印该语句即可。
print(sql)
Run Code Online (Sandbox Code Playgroud)
如果您想记录结果,请获取结果并打印结果。例如
result = cur.fetchall()
for row in result:
print(row)
Run Code Online (Sandbox Code Playgroud)
或者,您可以使用self.log.info代替 print,其中 self 指的是运算符实例。
| 归档时间: |
|
| 查看次数: |
4500 次 |
| 最近记录: |