处理SQLAlchemy中插入的重复主键(声明式样式)

Edw*_*rdr 37 python mysql sqlalchemy celery

我的应用程序使用范围会话和SQLALchemy的声明式样式.它是一个Web应用程序,许多数据库插入由Celery任务调度程序执行.

通常,在决定插入对象时,我的代码可能会执行以下操作:

from schema import Session
from schema.models import Bike

pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
    new_bike = Bike(pk, "shiny", "bike")
    Session.add(new_bike)
    Session.commit()
Run Code Online (Sandbox Code Playgroud)

这里的问题是,由于很多,这是通过异步工人完成的,它是可能的一个工作是,虽然中途插入Bikeid=123,而另一个正在检查它的存在.在这种情况下,第二个worker将尝试插入一个具有相同主键的行,SQLAlchemy将引发一个IntegrityError.

我不能为我的生活找到一个很好的方法来处理这个问题,除了交换Session.commit():

'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())

def commit(ignore=False):
    try:
        Session.commit()
    except IntegrityError as e:
        reason = e.message
        logger.warning(reason)

        if not ignore:
            raise e

        if "Duplicate entry" in reason:
            logger.info("%s already in table." % e.params[0])
            Session.rollback()
Run Code Online (Sandbox Code Playgroud)

然后,Session.commit我现在到处都有schema.commit(ignore=True),我不介意再次插入行.

对我来说,由于字符串检查,这似乎非常脆弱.就像一个FYI,当一个IntegrityError被提升时,它看起来像这样:

(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")
Run Code Online (Sandbox Code Playgroud)

所以我插入的主键当然是那样的,Duplicate entry is a cool thing我想我可能会错过IntegrityError那些实际上并不是因为重复的主键.

有没有更好的方法,维护我正在使用的干净的SQLAlchemy方法(而不是开始在字符串中写出语句等.)

Db是MySQL(虽然对于单元测试我喜欢使用SQLite,并且不希望用任何新方法来阻碍这种能力).

干杯!

sir*_*ger 28

如果您使用session.merge(bike)而不是session.add(bike),则不会生成主键错误.在bike将被检索和更新,或者根据需要创建的.

  • 如果您使用合并,如果您同时在不同的会话上进行两次合并,则仍然可以获得完整性错误. (8认同)

joe*_*ker 8

您应该IntegrityError以相同的方式处理:回滚事务,并可选择重试.有些数据库甚至不会让你做任何事情IntegrityError.您还可以在两个冲突事务开始时获取表上的锁,或者如果数据库允许,则获取更细粒度的锁.

使用该with语句显式开始事务,并自动提交(或回滚任何异常):

from schema import Session
from schema.models import Bike

session = Session()
with session.begin():
    pk = 123 # primary key
    bike = session.query(Bike).filter_by(bike_id=pk).first()
    if not bike: # no bike in DB
        new_bike = Bike(pk, "shiny", "bike")
        session.add(new_bike)
Run Code Online (Sandbox Code Playgroud)

  • 在上面的代码中,你可能希望`.first()`改为`.one()`,因为它应该是一个唯一的字段.无论如何,这可能不相关,因为更有趣的观察是答案中的代码引入了竞争条件.在检查记录的存在和添加记录之间,另一个工作者可能已经添加了它.检查`IntegrityError`并在必要时回滚更安全. (3认同)