SQLAlchemy - 在postgresql中执行批量upsert(如果存在,更新,否则插入)

mgo*_*ser 30 python postgresql sqlalchemy flask-sqlalchemy

我正在尝试使用SQLAlchemy模块在python中编写批量upsert(而不是在SQL中!).

我在SQLAlchemy添加上收到以下错误:

sqlalchemy.exc.IntegrityError: (IntegrityError) duplicate key value violates unique constraint "posts_pkey"
DETAIL:  Key (id)=(TEST1234) already exists.
Run Code Online (Sandbox Code Playgroud)

我有一个postsid列上使用主键调用的表.

在这个例子中,我已经在db中有一行id=TEST1234.当我尝试db.session.add()使用id设置为的新帖子对象时TEST1234,我得到上面的错误.我的印象是,如果主键已经存在,记录将会更新.

如何仅使用基于主键的Flask-SQLAlchemy进行升级?有简单的解决方案吗?

如果没有,我总是可以检查并删除任何匹配id的记录,然后插入新记录,但这对我的情况来说似乎很昂贵,我不希望有很多更新.

mgo*_*ser 23

SQLAlchemy中有一个upsert-esque操作:

db.session.merge()

在我找到这个命令后,我能够执行upserts,但值得一提的是,对于批量"upsert",这个操作很慢.

另一种方法是获取要插入的主键列表,并在数据库中查询任何匹配的ID:

# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively
# The goal is to "upsert" these posts.
# we initialize a dict which maps id to the post object

my_new_posts = {1: post1, 5: post5, 1000: post1000} 

for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all():
    # Only merge those posts which already exist in the database
    db.session.merge(my_new_posts.pop(each.id))

# Only add those posts which did not exist in the database 
db.session.add_all(my_new_posts.values())

# Now we commit our modifications (merges) and inserts (adds) to the database!
db.session.commit()
Run Code Online (Sandbox Code Playgroud)

  • 合并没有没有tegridy (9认同)
  • 合并不处理intigirtyError (6认同)
  • 上面的过程很慢,不能使用 (2认同)
  • 如果您在唯一索引中捕获“重复键”错误,合并将无济于事,它仅适用于主键 (2认同)

exh*_*uma 13

您可以利用该on_conflict_do_update变体。一个简单的例子如下:

from sqlalchemy.dialects.postgresql import insert

class Post(Base):
    """
    A simple class for demonstration
    """

    id = Column(Integer, primary_key=True)
    title = Column(Unicode)

# Prepare all the values that should be "upserted" to the DB
values = [
    {"id": 1, "title": "mytitle 1"},
    {"id": 2, "title": "mytitle 2"},
    {"id": 3, "title": "mytitle 3"},
    {"id": 4, "title": "mytitle 4"},
]

stmt = insert(Post).values(values)
stmt = stmt.on_conflict_do_update(
    # Let's use the constraint name which was visible in the original posts error msg
    constraint="post_pkey",

    # The columns that should be updated on conflict
    set_={
        "title": stmt.excluded.title
    }
)
session.execute(stmt)
Run Code Online (Sandbox Code Playgroud)

有关更多详细信息,请参阅PG 文档(例如“排除”术语的来源)。

关于重复列名的旁注

上面的代码使用列名作为values列表中的字典键和set_. 如果在类定义中更改了列名,则需要在任何地方更改,否则会中断。这可以通过访问列定义来避免,使代码更难看,但更健壮:

coldefs = Post.__table__.c

values = [
    {coldefs.id.name: 1, coldefs.title.name: "mytitlte 1"},
    ...
]

stmt = stmt.on_conflict_do_update(
    ...
    set_={
        coldefs.title.name: stmt.excluded.title
        ...
    }
)
Run Code Online (Sandbox Code Playgroud)


dan*_*all 6

使用编译扩展的替代方法(https://docs.sqlalchemy.org/en/13/core/compiler.html):

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def compile_upsert(insert_stmt, compiler, **kwargs):
    """
    converts every SQL insert to an upsert  i.e;
    INSERT INTO test (foo, bar) VALUES (1, 'a')
    becomes:
    INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)
    (assuming foo is a primary key)
    :param insert_stmt: Original insert statement
    :param compiler: SQL Compiler
    :param kwargs: optional arguments
    :return: upsert statement
    """
    pk = insert_stmt.table.primary_key
    insert = compiler.visit_insert(insert_stmt, **kwargs)
    ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET'
    updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns)
    upsert = ' '.join((insert, ondup, updates))
    return upsert
Run Code Online (Sandbox Code Playgroud)

这应该确保所有插入语句都表现为更新插入。此实现采用 Postgres 方言,但针对 MySQL 方言进行修改应该相当容易。

  • 将*所有*插入转换为更新插入是有风险的。有时,您“需要”获取完整性错误以确保数据一致性并避免意外覆盖。只有当您 120% 了解此解决方案的所有影响时,我才会使用此解决方案! (2认同)