我的应用程序使用范围会话和SQLALchemy的声明式样式.它是一个Web应用程序,许多数据库插入由Celery任务调度程序执行.
通常,在决定插入对象时,我的代码可能会执行以下操作:
from schema import Session
from schema.models import Bike
pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
Session.add(new_bike)
Session.commit()
Run Code Online (Sandbox Code Playgroud)
这里的问题是,由于很多,这是通过异步工人完成的,它是可能的一个工作是,虽然中途插入Bike有id=123,而另一个正在检查它的存在.在这种情况下,第二个worker将尝试插入一个具有相同主键的行,SQLAlchemy将引发一个IntegrityError.
我不能为我的生活找到一个很好的方法来处理这个问题,除了交换Session.commit():
'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())
def commit(ignore=False):
try:
Session.commit()
except IntegrityError as e:
reason = e.message
logger.warning(reason)
if not ignore:
raise e
if "Duplicate entry" in reason:
logger.info("%s already in …Run Code Online (Sandbox Code Playgroud) 使用 Sqlalchemy 对于大型数据集,我想使用 session.add_all() 和 session.commit() 等高效方法插入所有行。我正在寻找一种方法来忽略插入任何引发重复/唯一键错误的行。问题是这些错误仅在 session.commit() 调用时出现,因此无法使该特定行失败并移至下一行。
我见过的最接近的问题在这里:SQLAlchemy -批量插入忽略:“重复条目”;然而,接受的答案建议不要使用批量方法并在每行插入后提交,这非常慢并且会导致大量的 I/O,所以我正在寻找更好的解决方案。