Psycopg2 和 Flask - 将连接绑定到 before_request 和 teardown_appcontext

Flo*_*ian 4 psycopg2 flask

干杯家伙,重构我的瓶的应用程序我被困在追平了数据库连接@app.before_request,并在关闭它@app.teardown_appcontext。我正在使用普通的 Psycopg2 和应用程序工厂模式。

首先,我创建了一个函数来调用应用程序工厂,因此我可以按照Miguel Grinberg 的建议使用 @app此处

def create_app(test_config=None):
    app = Flask(__name__, instance_relative_config=True)

    --

    from shop.db import connect_and_close_db
    connect_and_close_db(app)

    --

    return app
Run Code Online (Sandbox Code Playgroud)

然后我尝试了http://flask.pocoo.org/docs/1.0/appcontext/#storing-data 上建议的这种模式:

def connect_and_close_db(app):

    @app.before_request
    def get_db_test():
        conn_string = "dbname=testdb user=testuser password=test host=localhost"
        if 'db' not in g:
            g.db = psycopg2.connect(conn_string)
        return g.db

    @app.teardown_appcontext
    def close_connection(exception):
        db = g.pop('db', None)

        if db is not None:
            db.close()
Run Code Online (Sandbox Code Playgroud)

结果是:

TypeError: 'psycopg2.extensions.connection' object is not callable
Run Code Online (Sandbox Code Playgroud)

任何人都知道发生了什么以及如何使其发挥作用?

此外,我想知道如何访问连接对象来创建游标,一旦它的创建被绑定到before_request

v25*_*v25 7

这个解决方案可能远非完美,也不是很干。我欢迎在此基础上发表评论或其他答案。

要实现原始psycopg2支持,您可能需要查看连接池程序。关于如何使用 Flask 实现这一点,还有一个很好的指南

基本思想是首先创建您的连接池。您希望在 Flask 应用程序初始化时建立它(这可以在 python 解释器中或通过可能有多个的 gunicorn 工作器 - 在这种情况下,每个工作器都有自己的连接池)。我选择将返回的池存储在配置中:

from flask import Flask, g, jsonify

import psycopg2
from psycopg2 import pool

app = Flask(__name__)

app.config['postgreSQL_pool'] = psycopg2.pool.SimpleConnectionPool(1, 20,
    user = "postgres",
    password = "very_secret",
    host = "127.0.0.1",
    port = "5432",
    database = "postgres")
Run Code Online (Sandbox Code Playgroud)

注意前两个参数SimpleConnectionPoolmin&max连接。这是到您的数据库服务器的连接数,在本例中为bwtween 1& 20

接下来定义一个get_db函数:

def get_db():
    if 'db' not in g:
        g.db = app.config['postgreSQL_pool'].getconn()
    return g.db
Run Code Online (Sandbox Code Playgroud)

SimpleConnectionPool.getconn()这里使用的方法只是从池中返回一个连接,我们将其分配给g.db并返回。这意味着当我们get_db()在代码中的任何地方调用时,它返回相同的连接,或者如果不存在则创建一个连接。不需要before.context装饰器。

请定义您的拆卸功能:

@app.teardown_appcontext
def close_conn(e):
    db = g.pop('db', None)
    if db is not None:
        app.config['postgreSQL_pool'].putconn(db)
Run Code Online (Sandbox Code Playgroud)

这在应用程序上下文被破坏时运行,并用于SimpleConnectionPool.putconn()放置连接。

最后定义一个路由:

@app.route('/')
def index():
    db = get_db()
    cursor = db.cursor()

    cursor.execute("select 1;")
    result = cursor.fetchall()
    print (result)

    cursor.close()
    return jsonify(result)
Run Code Online (Sandbox Code Playgroud)

此代码适用于我针对在 docker 容器中运行的 postgres 进行测试。可能应该改进的一些事情:

  • 这种观点不是很枯燥。也许您可以将其中的一些移动到get_db函数中,以便它返回一个游标。(!!!)

  • 当python解释器退出时,您还应该找到关闭与 app.config['postgreSQL_pool'].closeall

  • 尽管测试了某种监视池的方法会很好,这样您就可以在负载下观察池/数据库连接并确保池程序按预期运行。

(!!!) 在另一个领域,sqlalchemy.scoped_session 文档解释了更多与此相关的事情,并提供了一些关于其“会话”如何与请求相关的理论。他们以一种您可以调用的方式实现了它,Session.query('SELECT 1')如果会话不存在,它将创建会话。


编辑:这是您的应用工厂模式的要点,以及评论中的示例用法。