Edw*_*rdr 37 python mysql sqlalchemy celery
我的应用程序使用范围会话和SQLALchemy的声明式样式.它是一个Web应用程序,许多数据库插入由Celery任务调度程序执行.
通常,在决定插入对象时,我的代码可能会执行以下操作:
from schema import Session
from schema.models import Bike
pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
Session.add(new_bike)
Session.commit()
Run Code Online (Sandbox Code Playgroud)
这里的问题是,由于很多,这是通过异步工人完成的,它是可能的一个工作是,虽然中途插入Bike有id=123,而另一个正在检查它的存在.在这种情况下,第二个worker将尝试插入一个具有相同主键的行,SQLAlchemy将引发一个IntegrityError.
我不能为我的生活找到一个很好的方法来处理这个问题,除了交换Session.commit():
'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())
def commit(ignore=False):
try:
Session.commit()
except IntegrityError as e:
reason = e.message
logger.warning(reason)
if not ignore:
raise e
if "Duplicate entry" in reason:
logger.info("%s already in table." % e.params[0])
Session.rollback()
Run Code Online (Sandbox Code Playgroud)
然后,Session.commit我现在到处都有schema.commit(ignore=True),我不介意再次插入行.
对我来说,由于字符串检查,这似乎非常脆弱.就像一个FYI,当一个IntegrityError被提升时,它看起来像这样:
(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")
Run Code Online (Sandbox Code Playgroud)
所以我插入的主键当然是那样的,Duplicate entry is a cool thing我想我可能会错过IntegrityError那些实际上并不是因为重复的主键.
有没有更好的方法,维护我正在使用的干净的SQLAlchemy方法(而不是开始在字符串中写出语句等.)
Db是MySQL(虽然对于单元测试我喜欢使用SQLite,并且不希望用任何新方法来阻碍这种能力).
干杯!
sir*_*ger 28
如果您使用session.merge(bike)而不是session.add(bike),则不会生成主键错误.在bike将被检索和更新,或者根据需要创建的.
您应该IntegrityError以相同的方式处理:回滚事务,并可选择重试.有些数据库甚至不会让你做任何事情IntegrityError.您还可以在两个冲突事务开始时获取表上的锁,或者如果数据库允许,则获取更细粒度的锁.
使用该with语句显式开始事务,并自动提交(或回滚任何异常):
from schema import Session
from schema.models import Bike
session = Session()
with session.begin():
pk = 123 # primary key
bike = session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
session.add(new_bike)
Run Code Online (Sandbox Code Playgroud)