从 pandas 插入 postgreSQL 表并进行“冲突”更新

Gra*_*ing 9 python postgresql insert-update dataframe pandas

我有一个 pandas DataFrame,需要将其存储到数据库中。这是我当前用于插入的代码行:

df.to_sql(table,con=engine,if_exists='append',index_label=index_col)
Run Code Online (Sandbox Code Playgroud)

如果我的表中不存在任何行,则此方法可以正常工作df。如果行已存在,我会收到此错误:

sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key
value violates unique constraint "mypk"
DETAIL:  Key (id)=(42) already exists.
 [SQL: 'INSERT INTO mytable (id, owner,...) VALUES (%(id)s, %(owner)s,...']
 [parameters:...] (Background on this error at: http://sqlalche.me/e/gkpj)
Run Code Online (Sandbox Code Playgroud)

并且没有插入任何内容。

PostgreSQL 有可选ON CONFLICT子句,可用于UPDATE现有表行。我阅读了整个pandas.DataFrame.to_sql 手册页,但找不到任何在函数ON CONFLICT内使用的方法DataFrame.to_sql()

我考虑过根据数据库表中已有的内容将我的 DataFrame 一分为二。所以现在我有两个 DataFrameinsert_rowsupdate_rows,我可以安全地执行

insert_rows.to_sql(table, con=engine, if_exists='append', index_label=index_col)
Run Code Online (Sandbox Code Playgroud)

但是,似乎没有UPDATE相当于 的东西DataFrame.to_sql()。那么如何使用 DataFrame 更新表呢update_rows

Sat*_*tan 23

我知道这是一个旧线程,但我遇到了同样的问题,并且该线程出现在 Google 中。目前还没有一个答案真正令人满意,所以我得出的结论是:

我的解决方案与 zdgriffith 的答案非常相似,但性能更高,因为不需要迭代data_iter

def postgres_upsert(table, conn, keys, data_iter):
    from sqlalchemy.dialects.postgresql import insert

    data = [dict(zip(keys, row)) for row in data_iter]

    insert_statement = insert(table.table).values(data)
    upsert_statement = insert_statement.on_conflict_do_update(
        constraint=f"{table.table.name}_pkey",
        set_={c.key: c for c in insert_statement.excluded},
    )
    conn.execute(upsert_statement)
Run Code Online (Sandbox Code Playgroud)

现在您可以在 pandas 的to_sql方法中使用此自定义 upsert 方法,如 zdgriffith 所示。

请注意,我的 upsert 函数使用表的主键约束。您可以通过更改 的constraint参数来定位另一个约束.on_conflict_do_update

.excluded相关线程上的这个答案解释了更多的使用: /sf/answers/3635487971/


小智 15

@ SaturnFromTitan,感谢您对这个旧线程的回复。这就像魔术一样。我会投票,但我没有代表。

对于那些像我一样对这一切不熟悉的人:您可以剪切并粘贴 SaturnFromTitan 答案并使用以下内容调用它:

    df.to_sql('my_table_name', 
              dbConnection,schema='my_schema',
              if_exists='append',
              index=False,
              method=postgres_upsert)  
Run Code Online (Sandbox Code Playgroud)

就是这样。更新插入有效。


zdg*_*ith 5

为了用一个例子来跟进布伦丹的回答,这对我有用:

import os
import sqlalchemy as sa
import pandas as pd
from sqlalchemy.dialects.postgresql import insert


engine = sa.create_engine(os.getenv("DBURL"))
meta = sa.MetaData()
meta.bind = engine
meta.reflect(views=True)


def upsert(table, conn, keys, data_iter):
    upsert_args = {"constraint": "test_table_col_a_col_b_key"}
    for data in data_iter:
        data = {k: data[i] for i, k in enumerate(keys)}
        upsert_args["set_"] = data
        insert_stmt = insert(meta.tables[table.name]).values(**data)
        upsert_stmt = insert_stmt.on_conflict_do_update(**upsert_args)
        conn.execute(upsert_stmt)


if __name__ == "__main__":
    df = pd.read_csv("test_data.txt")
    with db.engine.connect() as conn:
        df.to_sql(
            "test_table",
            con=conn,
            if_exists="append",
            method=upsert,
            index=False,
        )
Run Code Online (Sandbox Code Playgroud)

在此示例中,架构将类似于:

CREATE TABLE test_table(
    col_a text NOT NULL,
    col_b text NOT NULL,
    col_c text,
    UNIQUE (col_a, col_b)
)
Run Code Online (Sandbox Code Playgroud)


Bre*_*tin 3

如果您注意到to_sql文档中提到了一个method需要可调用的参数。创建这个可调用函数应该允许您使用所需的 Postgres 子句。这是他们在文档中提到的可调用示例: https: //pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

它与您需要的有很大不同,但请遵循传递给此可调用函数的参数。它们将允许您构建常规 SQL 语句。