在SQLAlchemy中,如何从SELECT语句中填充或更新表?
我正在使用SQLAlchemy,我还不确定我将使用哪个数据库,因此我希望尽可能保持与数据库无关.如何在不将自己绑定到特定数据库的情况下将时区感知日期时间对象存储在数据库中?现在,我确定时间是UTC,然后我将它们存储在数据库中,并在显示时转换为本地化,但感觉不够优雅和脆弱.是否有一种与数据库无关的方法来从SQLAlchemy中获取时区感知日期时间而不是从数据库中获取天真的数据时间对象?
我正在尝试使用SQLAlchemy上的声明来实现自引用的多对多关系.
这种关系代表两个用户之间的友谊.在线我发现(在文档和谷歌中)如何建立一个自我参照的m2m关系,在某种程度上角色是有区别的.这意味着在这个m2m关系中,UserA例如是UserB的老板,所以他将他列为"下属"属性或者你有什么.同样,UserB在'上级'下列出UserA.
这没有问题,因为我们可以用这种方式声明一个backref到同一个表:
subordinates = relationship('User', backref='superiors')
Run Code Online (Sandbox Code Playgroud)
当然,那里的'上级'属性在课堂上并不明确.
无论如何,这是我的问题:如果我想反馈到我正在调用backref的相同属性怎么办?像这样:
friends = relationship('User',
secondary=friendship, #this is the table that breaks the m2m
primaryjoin=id==friendship.c.friend_a_id,
secondaryjoin=id==friendship.c.friend_b_id
backref=??????
)
Run Code Online (Sandbox Code Playgroud)
这是有道理的,因为如果A和B交朋友关系角色是相同的,如果我调用B的朋友,我应该得到一个包含A的列表.这是完整的问题代码:
friendship = Table(
'friendships', Base.metadata,
Column('friend_a_id', Integer, ForeignKey('users.id'), primary_key=True),
Column('friend_b_id', Integer, ForeignKey('users.id'), primary_key=True)
)
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
friends = relationship('User',
secondary=friendship,
primaryjoin=id==friendship.c.friend_a_id,
secondaryjoin=id==friendship.c.friend_b_id,
#HELP NEEDED HERE
)
Run Code Online (Sandbox Code Playgroud)
对不起,如果这是太多的文字,我只想尽可能明确地用这个.我似乎无法在网上找到任何参考资料.
我有一个flask-sqlalchemy模型:
class MyModel(db.Model):
__tablename__ = 'targets'
id = db.Column(db.Integer, primary_key=True)
url = db.Column(db.String(2048))
Run Code Online (Sandbox Code Playgroud)
该表已经创建并正在使用中.我想在url属性上创建一个索引,所以我将index = True传递给它:
url = db.Column(db.String(2048), index=True)
Run Code Online (Sandbox Code Playgroud)
如何在不删除和重新创建表的情况下使此索引生效?
我有一个返回TimeOut的Sql Alchemy应用程序:
TimeoutError:达到大小为5的QueuePool限制溢出10,连接超时,超时30
我在另一篇文章中读到,当我不关闭会话时会发生这种情况,但我不知道这是否适用于我的代码:
我在init.py中连接到数据库:
from .dbmodels import (
DBSession,
Base,
engine = create_engine("mysql://" + loadConfigVar("user") + ":" + loadConfigVar("password") + "@" + loadConfigVar("host") + "/" + loadConfigVar("schema"))
#Sets the engine to the session and the Base model class
DBSession.configure(bind=engine)
Base.metadata.bind = engine
Run Code Online (Sandbox Code Playgroud)
然后在另一个python文件中我收集了两个函数中的一些数据,但是使用了我在init.py中初始化的DBSession:
from .dbmodels import DBSession
from .dbmodels import resourcestatsModel
def getFeaturedGroups(max = 1):
try:
#Get the number of download per resource
transaction.commit()
rescount = DBSession.connection().execute("select resource_id,count(resource_id) as total FROM resourcestats")
#Move the data to an array
resources …Run Code Online (Sandbox Code Playgroud) 我有以下查询来检索单列数据:
routes_query = select(
[schema.stop_times.c.route_number],
schema.stop_times.c.stop_id == stop_id
).distinct(schema.stop_times.c.route_number)
result = conn.execute(routes_query)
return [r['route_number'] for r in result]
Run Code Online (Sandbox Code Playgroud)
我想知道是否有一种更清晰的方法来检索返回的数据行的本机列表.
有没有办法为查询对象创建自定义方法,以便您可以这样做?
User.query.all_active()
Run Code Online (Sandbox Code Playgroud)
all_active()基本上在哪里.filter(User.is_active == True)
能够过滤掉它吗?
User.query.all_active().filter(User.age == 30)
Run Code Online (Sandbox Code Playgroud) 我有一个表'test',其列'Name'没有约束.ALTER通过给它一个UNIQUE约束我需要这个专栏.我该怎么办?
我应该使用op.alter_column('???')或create_unique_constraint('???')?新列不是create_unique_constraint而不是现有列吗?
在Flask-SQLAlchemy教程中,定义了User模型的构造函数:
from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
email = db.Column(db.String(120), unique=True)
def __init__(self, username, email):
self.username = username
self.email = email
Run Code Online (Sandbox Code Playgroud)
对于有两列的表,这可能是可以接受的,但如果我有10列以上的表怎么办?每次定义新模型时都必须定义构造函数吗?
我已经搜索了很长时间,但无法找到问题的解决方案.我们将SQLAlchemy与MySQL结合用于我们的项目,我们遇到了几次可怕的错误:
1213,'试图锁定时发现死锁; 尝试重启事务'.
在这种情况下,我们想尝试最多重启三次交易.
我已经开始编写一个装饰器来执行此操作,但我不知道如何在失败之前保存会话状态并在之后重试相同的事务?(因为SQLAlchemy在引发异常时需要回滚)
我到目前为止的工作,
def retry_on_deadlock_decorator(func):
lock_messages_error = ['Deadlock found', 'Lock wait timeout exceeded']
@wraps(func)
def wrapper(*args, **kwargs):
attempt_count = 0
while attempt_count < settings.MAXIMUM_RETRY_ON_DEADLOCK:
try:
return func(*args, **kwargs)
except OperationalError as e:
if any(msg in e.message for msg in lock_messages_error) \
and attempt_count <= settings.MAXIMUM_RETRY_ON_DEADLOCK:
logger.error('Deadlock detected. Trying sql transaction once more. Attempts count: %s'
% (attempt_count + 1))
else:
raise
attempt_count += 1
return wrapper
Run Code Online (Sandbox Code Playgroud) sqlalchemy ×10
python ×9
alembic ×1
datetime ×1
flask ×1
indexing ×1
many-to-many ×1
mysql ×1
session ×1
zope ×1