Gra*_*ing 9 python postgresql insert-update dataframe pandas
我有一个 pandas DataFrame,需要将其存储到数据库中。这是我当前用于插入的代码行:
df.to_sql(table,con=engine,if_exists='append',index_label=index_col)
Run Code Online (Sandbox Code Playgroud)
如果我的表中不存在任何行,则此方法可以正常工作df。如果行已存在,我会收到此错误:
sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key
value violates unique constraint "mypk"
DETAIL: Key (id)=(42) already exists.
[SQL: 'INSERT INTO mytable (id, owner,...) VALUES (%(id)s, %(owner)s,...']
[parameters:...] (Background on this error at: http://sqlalche.me/e/gkpj)
Run Code Online (Sandbox Code Playgroud)
并且没有插入任何内容。
PostgreSQL 有可选ON CONFLICT子句,可用于UPDATE现有表行。我阅读了整个pandas.DataFrame.to_sql 手册页,但找不到任何在函数ON CONFLICT内使用的方法DataFrame.to_sql()。
我考虑过根据数据库表中已有的内容将我的 DataFrame 一分为二。所以现在我有两个 DataFrameinsert_rows和update_rows,我可以安全地执行
insert_rows.to_sql(table, con=engine, if_exists='append', index_label=index_col)
Run Code Online (Sandbox Code Playgroud)
但是,似乎没有UPDATE相当于 的东西DataFrame.to_sql()。那么如何使用 DataFrame 更新表呢update_rows?
Sat*_*tan 23
我知道这是一个旧线程,但我遇到了同样的问题,并且该线程出现在 Google 中。目前还没有一个答案真正令人满意,所以我得出的结论是:
我的解决方案与 zdgriffith 的答案非常相似,但性能更高,因为不需要迭代data_iter:
def postgres_upsert(table, conn, keys, data_iter):
from sqlalchemy.dialects.postgresql import insert
data = [dict(zip(keys, row)) for row in data_iter]
insert_statement = insert(table.table).values(data)
upsert_statement = insert_statement.on_conflict_do_update(
constraint=f"{table.table.name}_pkey",
set_={c.key: c for c in insert_statement.excluded},
)
conn.execute(upsert_statement)
Run Code Online (Sandbox Code Playgroud)
现在您可以在 pandas 的to_sql方法中使用此自定义 upsert 方法,如 zdgriffith 所示。
请注意,我的 upsert 函数使用表的主键约束。您可以通过更改 的constraint参数来定位另一个约束.on_conflict_do_update。
.excluded相关线程上的这个答案解释了更多的使用: /sf/answers/3635487971/
小智 15
@ SaturnFromTitan,感谢您对这个旧线程的回复。这就像魔术一样。我会投票,但我没有代表。
对于那些像我一样对这一切不熟悉的人:您可以剪切并粘贴 SaturnFromTitan 答案并使用以下内容调用它:
df.to_sql('my_table_name',
dbConnection,schema='my_schema',
if_exists='append',
index=False,
method=postgres_upsert)
Run Code Online (Sandbox Code Playgroud)
就是这样。更新插入有效。
为了用一个例子来跟进布伦丹的回答,这对我有用:
import os
import sqlalchemy as sa
import pandas as pd
from sqlalchemy.dialects.postgresql import insert
engine = sa.create_engine(os.getenv("DBURL"))
meta = sa.MetaData()
meta.bind = engine
meta.reflect(views=True)
def upsert(table, conn, keys, data_iter):
upsert_args = {"constraint": "test_table_col_a_col_b_key"}
for data in data_iter:
data = {k: data[i] for i, k in enumerate(keys)}
upsert_args["set_"] = data
insert_stmt = insert(meta.tables[table.name]).values(**data)
upsert_stmt = insert_stmt.on_conflict_do_update(**upsert_args)
conn.execute(upsert_stmt)
if __name__ == "__main__":
df = pd.read_csv("test_data.txt")
with db.engine.connect() as conn:
df.to_sql(
"test_table",
con=conn,
if_exists="append",
method=upsert,
index=False,
)
Run Code Online (Sandbox Code Playgroud)
在此示例中,架构将类似于:
CREATE TABLE test_table(
col_a text NOT NULL,
col_b text NOT NULL,
col_c text,
UNIQUE (col_a, col_b)
)
Run Code Online (Sandbox Code Playgroud)
如果您注意到to_sql文档中提到了一个method需要可调用的参数。创建这个可调用函数应该允许您使用所需的 Postgres 子句。这是他们在文档中提到的可调用示例: https: //pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
它与您需要的有很大不同,但请遵循传递给此可调用函数的参数。它们将允许您构建常规 SQL 语句。
| 归档时间: |
|
| 查看次数: |
9106 次 |
| 最近记录: |