如何使用SqlAlchemy进行upsert?

Rus*_*uss 54 python sqlalchemy upsert

我有一个记录,我希望它存在于数据库中,如果它不存在,如果它已存在(主键存在)我希望字段更新到当前状态.这通常称为upsert.

以下不完整的代码片段演示了什么可行,但它似乎过于笨重(特别是如果有更多的列).什么是更好/最好的方式?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()
Run Code Online (Sandbox Code Playgroud)

这样做有更好或更简洁的方法吗?像这样的东西会很棒:

sess.upsert_this(desired_default, unique_key = "name")
Run Code Online (Sandbox Code Playgroud)

尽管unique_keykwarg显然是不必要的(ORM应该能够很容易地解决这个问题)但我之所以添加它只是因为SQLAlchemy倾向于只使用主键.例如:我一直在研究Session.merge是否适用,但这仅适用于主键,在这种情况下,主键是一个自动增量id,对于此目的并不是非常有用.

此示例用例就是在启动可能已升级其默认预期数据的服务器应用程序时.即:此upsert没有并发问题.

wbe*_*rry 44

SQLAlchemy确实有一个"保存或更新"行为,在最近的版本中已经内置了session.add,但之前是单独的session.saveorupdate调用.这不是一个"upsert",但它可能足以满足您的需求.

你问一个有多个唯一键的课是件好事.我相信这正是没有一种正确方法可以做到这一点的原因.主键也是唯一键.如果没有唯一约束,只有主键,那就足够简单:如果不存在给定ID,或者如果ID为None,则创建一个新记录; 否则使用该主键更新现有记录中的所有其他字段.

但是,当存在其他独特约束时,这种简单方法存在逻辑问题.如果要"upsert"一个对象,并且对象的主键与现有记录匹配,但另一个唯一列与另一个记录匹配,那么您要做什么?同样,如果主键不匹配现有记录,但另一个唯一列确实匹配现有记录,那么什么?对于您的特定情况,可能有正确的答案,但总的来说,我认为没有一个正确的答案.

这就是没有内置"upsert"操作的原因.应用程序必须定义每种特定情况下的含义.


P.R*_*.R. 21

SQLAlchemy的支持ON CONFLICT,现在有两种方法on_conflict_do_update()on_conflict_do_nothing():

从文档中复制:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
Run Code Online (Sandbox Code Playgroud)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

  • 顺便说一句,如果您只想更新所有排除的列,stmt.excluded 是一个用作映射的 ColumnCollection,因此您可以简单地说 `set_=stmt.excluded` (3认同)
  • [on_duplicate_key_update](https://docs.sqlalchemy.org/en/latest/dialects/mysql.html?highlight=upsert#insert-on-duplicate-key-update-upsert)也支持MySQL (2认同)
  • 这段代码是的,我想(答案是 3 岁以上),但也许 Michaels 的评论适用于 MySQL。一般来说,我的(这个)答案有点草率地得出结论:postgres 被用作数据库。这不是很好,因为它并没有真正回答所提出的一般问题。但根据我得到的赞成票,我认为它对某些人有用,所以我放弃了它。 (2认同)
  • 为什么我们在 set_ 中排除?set_=dict(data=stmt.excluded.data) (2认同)

Ben*_*Ben 9

我使用"先看你跳跃"的方法:

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
    filter(Switch_Command.switch_id == switch.id).\
    filter(Switch_Command.command_id == command.id).first()

# If we didn't get anything, make one
if not switch_command:
    switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)

# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()

session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()
Run Code Online (Sandbox Code Playgroud)

优点是这是数据库中立的,我认为很清楚.缺点是在如下情况下存在潜在的竞争条件:

  • 我们查询数据库中的a switch_command,但没有找到
  • 我们创造了一个 switch_command
  • 另一个进程或线程创建一个switch_command与我们相同的主键
  • 我们试图承诺我们的 switch_command

  • upsert 的整个目标是避免这里描述的竞争条件。 (17认同)

Nir*_*Izr 6

如今,SQLAlchemy提供了两个有用的功能on_conflict_do_nothingon_conflict_do_update。这些功能很有用,但是需要您从ORM界面切换到较低级别的一个SQLAlchemy Core

尽管这两个功能使使用SQLAlchemy语法进行加衬的难度不那么困难,但是这些功能远不能为加衬提供完整的现成解决方案。

我的常见用例是在单个SQL查询/会话执行中插入大量行。我通常会遇到两个问题:

例如,我们已经习惯了更高级别的ORM功能。您不能使用ORM对象,而必须ForeignKey在插入时提供。

我用下面的函数我写来处理这两个问题:

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)
Run Code Online (Sandbox Code Playgroud)

  • on_conflict仅适用于支持本机ON CONFLICT封装的后端。因此,只有postgresql (3认同)
  • @cowbert现在,SQLAlchemy还支持[ON DUPLICATE KEY UPDATE](https://docs.sqlalchemy.org/en/latest/dialects/mysql.html?highlight=upsert#insert-on-duplicate-key-update-upsert) MySQL的。 (2认同)