guy*_*yts 1 python sql database pandas amazon-redshift
我有一个巨大的表(超过 100B 记录),我向其中添加了一个空列。如果所需的字符串可用,我会解析来自另一个字段(字符串)的字符串,从该字段中提取一个整数,并希望在具有该字符串的所有行的新列中更新它。
目前,在数据被解析并本地保存在数据帧中后,我对其进行迭代以使用干净的数据更新 Redshift 表。这大约需要 1 秒/迭代,这太长了。
我当前的代码示例:
conn = psycopg2.connect(connection_details)
cur = conn.cursor()
clean_df = raw_data.apply(clean_field_to_parse)
for ind, row in clean_df.iterrows():
update_query = build_update_query(row.id, row.clean_integer1, row.clean_integer2)
cur.execute(update_query)
Run Code Online (Sandbox Code Playgroud)
其中update_query是生成更新查询的函数:
def update_query(id, int1, int2):
query = """
update tab_tab
set
clean_int_1 = {}::int,
clean_int_2 = {}::int,
updated_date = GETDATE()
where id = {}
;
"""
return query.format(int1, int2, id)
Run Code Online (Sandbox Code Playgroud)
其中 clean_df 的结构如下:
id . field_to_parse . clean_int_1 . clean_int_2
1 . {'int_1':'2+1'}. 3 . np.nan
2 . {'int_2':'7-0'}. np.nan . 7
Run Code Online (Sandbox Code Playgroud)
有没有一种方法可以批量更新特定的表字段,这样就不需要一次执行一个查询?
我正在解析字符串并从 Python 运行更新语句。数据库存储在 Redshift 上。
如前所述,考虑纯 SQL 并避免迭代数十亿行,方法是将 Pandas 数据帧推送到 Postgres 作为临时表,然后UPDATE在两个表中运行一个单一的表。使用 SQLAlchemy,您可以用来DataFrame.to_sql创建数据框的表副本。甚至添加连接字段id的索引,并在末尾删除非常大的临时表。
from sqlalchemy import create_engine
engine = create_engine("postgresql+psycopg2://myuser:mypwd!@myhost/mydatabase")
# PUSH TO POSTGRES (SAME NAME AS DF)
clean_df.to_sql(name="clean_df", con=engine, if_exists="replace", index=False)
# SQL UPDATE (USING TRANSACTION)
with engine.begin() as conn:
sql = "CREATE INDEX idx_clean_df_id ON clean_df(id)"
conn.execute(sql)
sql = """UPDATE tab_tab t
SET t.clean_int_1 = c.int1,
t.clean_int_2 = c.int2,
t.updated_date = GETDATE()
FROM clean_df c
WHERE c.id = t.id
"""
conn.execute(sql)
sql = "DROP TABLE IF EXISTS clean_df"
conn.execute(sql)
engine.dispose()
Run Code Online (Sandbox Code Playgroud)