Fra*_*iou 4 python csv postgresql stdin psycopg2
我想要做
" on conflict (time) do update set name , description "
Run Code Online (Sandbox Code Playgroud)
但是我不知道何时将stdin与csv一起使用,我不知道什么名字等于什么?和描述等于...
table_a:
xxx.csv:
with open('xxx/xxx.csv', 'r', encoding='utf8') as f:
sql = """
COPY table_a FROM STDIN With CSV on conflict (time)
do update set name=??, description=??;
"""
cur.copy_expert(sql, f)
conn.commit()
Run Code Online (Sandbox Code Playgroud)
小智 7
我已经成功使用以下功能完成了批量更新插入(欢迎提出建议):
import io
from sqlalchemy.engine import Engine
from sqlalchemy.ext import declarative_base
BaseModel = declarative_base()
def upsert_bulk(engine: Engine, model: BaseModel, data: io.StringIO) -> None:
"""
Fast way to upsert multiple entries at once
:param `db`: DB Session
:param `data`: CSV in a stream object
"""
table_name = model.__tablename__
temp_table_name = f"temp_{table_name}"
columns = [c.key for c in model.__table__.columns]
# Select only columns to be updated (in my case, all non-id columns)
variable_columns = [c for c in columns if c != "id"]
# Create string with set of columns to be updated
update_set = ", ".join([f"{v}=EXCLUDED.{v}" for v in variable_columns])
# Rewind data and prepare it for `copy_from`
data.seek(0)
with conn.cursor() as cur:
# Creates temporary empty table with same columns and types as
# the final table
cur.execute(
f"""
CREATE TEMPORARY TABLE {temp_table_name} (LIKE {table_name})
ON COMMIT DROP
"""
)
# Copy stream data to the created temporary table in DB
cur.copy_from(data, temp_table_name)
# Inserts copied data from the temporary table to the final table
# updating existing values at each new conflict
cur.execute(
f"""
INSERT INTO {table_name}({', '.join(columns)})
SELECT * FROM {temp_table_name}
ON CONFLICT (id) DO UPDATE SET {update_set}
"""
)
# Drops temporary table (I believe this step is unnecessary,
# but tables sizes where growing without any new data modifications
# if this command isn't executed)
cur.execute(f"DROP TABLE {temp_table_name}")
# Commit everything through cursor
conn.commit()
conn.close()
Run Code Online (Sandbox Code Playgroud)
感谢每个主人的解决方案。
这是我的解决方案。
sql = """
CREATE TABLE temp_h (
time ,
name,
description
);
COPY temp_h FROM STDIN With CSV;
INSERT INTO table_a(time, name, description)
SELECT *
FROM temp_h ON conflict (time)
DO update set name=EXCLUDED.name, description=EXCLUDED.description;
DROP TABLE temp_h;
"""
Run Code Online (Sandbox Code Playgroud)
在这篇SO帖子中,有两个答案-结合在一起-为成功使用提供了一个不错的解决方案ON CONFLICT。下面的示例使用 ON CONFLICT DO NOTHING;:
CREATE TEMP TABLE tmp_table
(LIKE label INCLUDING DEFAULTS)
ON COMMIT DROP;
COPY tmp_table FROM 'full/file/name/here';
INSERT INTO main_table
SELECT *
FROM tmp_table
ON CONFLICT DO NOTHING;
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2858 次 |
| 最近记录: |