有没有办法测试SQLAlchemy连接?

mgi*_*gig 9 python sqlalchemy pandas

我正在使用SQLAlchemy连接将pandas DataFrame写入MySQL数据库.在我的代码的早期,我创建了一个SQLAlchemy引擎:

engine = create_my_sqlalchemy_connection()
Run Code Online (Sandbox Code Playgroud)

我执行一些查询,做一些计算,然后尝试使用相同的引擎稍后写入数据库:

df.to_sql('my_table', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)

有时这会起作用,有时连接会在代码准备写入数据库时​​丢失,并且有错误.

我可以尝试一下,除非需要创建一个新连接:

try:
    df.to_sql('my_table', engine, if_exists='append', index=False)
except:
    engine = create_my_sqlalchemy_connection()
    df.to_sql('my_table', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)

但是,我以为我会伸出手去看看是否有人知道更好的方法(例如,如果有一些我不知道的SQLAlchemy方法用于测试以确定连接是否仍然存在).

use*_*267 11

您可以让 SQLAlchemy 使用参数检查连接的活跃度pool_pre_ping: https: //docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.pool_pre_ping

\n\n
\n

如果 True 将启用连接池 \xe2\x80\x9cpre-ping\xe2\x80\x9d 功能,该功能在每次结帐时测试连接的活动性。

\n
\n\n

只需在创建引擎时使用即可启用它。

\n


Nik*_*iya 5

下面的代码片段非常适合我。

from sqlalchemy import text

# check if the connection is successfully established or not
with app.app_context():
    try:
        # db.session.execute('SELECT 1')
        db.session.execute(text('SELECT 1'))
        print('\n\n----------- Connection successful !')
    except Exception as e:
        print('\n\n----------- Connection failed ! ERROR : ', e)

Run Code Online (Sandbox Code Playgroud)

这是完整的代码。

import os, sys, click, urllib
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text

# Make sure to replace below data with your DB values
DATABASE_HOST = "10.10.10.110"
DATABASE_NAME = "dbtest"
DATABASE_USERNAME = "admin" 
DATABASE_PASSWORD = "admin@123"

app = Flask(__name__)

# to elimate the error, if the password contains special characters like '@' 
DATABASE_PASSWORD_UPDATED = urllib.parse.quote_plus(DATABASE_PASSWORD)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mssql+pymssql://'+DATABASE_USERNAME+':'+DATABASE_PASSWORD_UPDATED+'@'+DATABASE_HOST+'/'+DATABASE_NAME
app.config['SQLALCHEMY_ECHO'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

# check if the connection is successfully established or not
with app.app_context():
    try:
        # db.session.execute('SELECT 1')
        db.session.execute(text('SELECT 1'))
        print('\n\n----------- Connection successful !')
    except Exception as e:
        print('\n\n----------- Connection failed ! ERROR : ', e)



if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=True)

Run Code Online (Sandbox Code Playgroud)


Dan*_*Dan -1

值得尝试Connection.close属性。

if engine.closed:
    engine = create_my_sqlalchemy_connection()
    df.to_sql('my_table', engine, if_exists='append', index=False)
else:
    df.to_sql('my_table', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)

  • <class 'AttributeError'> 'Engine' 对象没有属性 'close' (4认同)