Rus*_*uss 54 python sqlalchemy upsert
我有一个记录,我希望它存在于数据库中,如果它不存在,如果它已存在(主键存在)我希望字段更新到当前状态.这通常称为upsert.
以下不完整的代码片段演示了什么可行,但它似乎过于笨重(特别是如果有更多的列).什么是更好/最好的方式?
Base = declarative_base()
class Template(Base):
__tablename__ = 'templates'
id = Column(Integer, primary_key = True)
name = Column(String(80), unique = True, index = True)
template = Column(String(80), unique = True)
description = Column(String(200))
def __init__(self, Name, Template, Desc):
self.name = Name
self.template = Template
self.description = Desc
def UpsertDefaultTemplate():
sess = Session()
desired_default = Template("default", "AABBCC", "This is the default template")
try:
q = sess.query(Template).filter_by(name = desiredDefault.name)
existing_default = q.one()
except sqlalchemy.orm.exc.NoResultFound:
#default does not exist yet, so add it...
sess.add(desired_default)
else:
#default already exists. Make sure the values are what we want...
assert isinstance(existing_default, Template)
existing_default.name = desired_default.name
existing_default.template = desired_default.template
existing_default.description = desired_default.description
sess.flush()
Run Code Online (Sandbox Code Playgroud)
这样做有更好或更简洁的方法吗?像这样的东西会很棒:
sess.upsert_this(desired_default, unique_key = "name")
Run Code Online (Sandbox Code Playgroud)
尽管unique_keykwarg显然是不必要的(ORM应该能够很容易地解决这个问题)但我之所以添加它只是因为SQLAlchemy倾向于只使用主键.例如:我一直在研究Session.merge是否适用,但这仅适用于主键,在这种情况下,主键是一个自动增量id,对于此目的并不是非常有用.
此示例用例就是在启动可能已升级其默认预期数据的服务器应用程序时.即:此upsert没有并发问题.
wbe*_*rry 44
SQLAlchemy确实有一个"保存或更新"行为,在最近的版本中已经内置了session.add,但之前是单独的session.saveorupdate调用.这不是一个"upsert",但它可能足以满足您的需求.
你问一个有多个唯一键的课是件好事.我相信这正是没有一种正确方法可以做到这一点的原因.主键也是唯一键.如果没有唯一约束,只有主键,那就足够简单:如果不存在给定ID,或者如果ID为None,则创建一个新记录; 否则使用该主键更新现有记录中的所有其他字段.
但是,当存在其他独特约束时,这种简单方法存在逻辑问题.如果要"upsert"一个对象,并且对象的主键与现有记录匹配,但另一个唯一列与另一个记录匹配,那么您要做什么?同样,如果主键不匹配现有记录,但另一个唯一列确实匹配现有记录,那么什么?对于您的特定情况,可能有正确的答案,但总的来说,我认为没有一个正确的答案.
这就是没有内置"upsert"操作的原因.应用程序必须定义每种特定情况下的含义.
P.R*_*.R. 21
SQLAlchemy的支持ON CONFLICT,现在有两种方法on_conflict_do_update()和on_conflict_do_nothing():
从文档中复制:
from sqlalchemy.dialects.postgresql import insert
stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
index_elements=[my_table.c.user_email],
index_where=my_table.c.user_email.like('%@gmail.com'),
set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
Run Code Online (Sandbox Code Playgroud)
我使用"先看你跳跃"的方法:
# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
filter(Switch_Command.switch_id == switch.id).\
filter(Switch_Command.command_id == command.id).first()
# If we didn't get anything, make one
if not switch_command:
switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)
# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()
session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()
Run Code Online (Sandbox Code Playgroud)
优点是这是数据库中立的,我认为很清楚.缺点是在如下情况下存在潜在的竞争条件:
switch_command,但没有找到switch_commandswitch_command与我们相同的主键switch_command如今,SQLAlchemy提供了两个有用的功能on_conflict_do_nothing和on_conflict_do_update。这些功能很有用,但是需要您从ORM界面切换到较低级别的一个SQLAlchemy Core。
尽管这两个功能使使用SQLAlchemy语法进行加衬的难度不那么困难,但是这些功能远不能为加衬提供完整的现成解决方案。
我的常见用例是在单个SQL查询/会话执行中插入大量行。我通常会遇到两个问题:
例如,我们已经习惯了更高级别的ORM功能。您不能使用ORM对象,而必须ForeignKey在插入时提供。
我用这下面的函数我写来处理这两个问题:
def upsert(session, model, rows):
table = model.__table__
stmt = postgresql.insert(table)
primary_keys = [key.name for key in inspect(table).primary_key]
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
if not update_dict:
raise ValueError("insert_or_update resulted in an empty update_dict")
stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
set_=update_dict)
seen = set()
foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
def handle_foreignkeys_constraints(row):
for c_name, c_value in foreign_keys.items():
foreign_obj = row.pop(c_value.table.name, None)
row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None
for const in unique_constraints:
unique = tuple([const,] + [row[col.name] for col in const.columns])
if unique in seen:
return None
seen.add(unique)
return row
rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
session.execute(stmt, rows)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
32295 次 |
| 最近记录: |