如何教SQLAlchemy从断开连接中恢复?

era*_*man 10 python sqlalchemy psycopg2

根据http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html#disconnect-handling-pessimistic,如果连接池中的条目不再有效,可以检测SQLAlchemy重新连接.我创建了以下测试用例来测试它:

import subprocess
from sqlalchemy import create_engine, event
from sqlalchemy import exc
from sqlalchemy.pool import Pool

@event.listens_for(Pool, "checkout")
def ping_connection(dbapi_connection, connection_record, connection_proxy):
    cursor = dbapi_connection.cursor()
    try:
        print "pinging server"
        cursor.execute("SELECT 1")
    except:
        print "raising disconnect error"
        raise exc.DisconnectionError()
    cursor.close()

engine = create_engine('postgresql://postgres@localhost/test')

connection = engine.connect()

subprocess.check_call(['psql', str(engine.url), '-c',
    "select pg_terminate_backend(pid) from pg_stat_activity " +
    "where pid <> pg_backend_pid() " +
    "and datname='%s';" % engine.url.database],
    stdout=subprocess.PIPE)

result = connection.execute("select 'OK'")
for row in result:
    print "Success!", " ".join(row)
Run Code Online (Sandbox Code Playgroud)

但是没有恢复,我收到了这个例外:

sqlalchemy.exc.OperationalError: (OperationalError) terminating connection due to administrator command
server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.
Run Code Online (Sandbox Code Playgroud)

由于"ping服务器"打印在终端上,因此可以安全地得出事件监听器已附加的结论.如何教SQLAlchemy从断开连接中恢复?

Ger*_*rat 5

看起来只有当您第一次从池中获取连接时才会调用checkout方法(例如,您的线路)connection = engine.connect()

如果您随后丢失了连接,则必须明确替换它,因此您可以只获取一个新连接,然后重试您的sql:

try:
    result = connection.execute("select 'OK'")
except sqlalchemy.exc.OperationalError:  # may need more exceptions here
    connection = engine.connect()  # grab a new connection
    result = connection.execute("select 'OK'")  # and retry
Run Code Online (Sandbox Code Playgroud)

这对于每一个sql都是很痛苦的,所以你可以使用类似的东西来包装数据库查询:

def db_execute(conn, query):
    try:
        result = conn.execute(query)
    except sqlalchemy.exc.OperationalError:  # may need more exceptions here (or trap all)
        conn = engine.connect()  # replace your connection
        result = conn.execute(query)  # and retry
    return result
Run Code Online (Sandbox Code Playgroud)

下列:

result = db_execute(connection, "select 'OK'")
Run Code Online (Sandbox Code Playgroud)

现在应该成功.

另一种选择是监听invalidate方法,并在那时采取一些措施来替换你的连接.

  • 您似乎已经确认SQLAlchemy文档具有误导性.如果每个查询都需要包装在辅助方法中,则此上下文中的_Pool_没有值会进行异常处理和显式恢复. (3认同)