6 python database postgresql psycopg
我目前正在分析维基百科转储文件; 我正在使用python从中提取一堆数据并将其持久化到PostgreSQL数据库中.我总是试图让这个文件变得更快(18GB).为了与PostgreSQL接口,我使用的是psycopg2,但是这个模块似乎模仿了许多其他类似的DBAPI.
无论如何,我有一个关于cursor.executemany(命令,值)的问题; 在我看来,每1000个值左右执行一次executemany比为这500万个值中的每一个调用cursor.execute(命令%值)更好(请确认或纠正我!).
但是,你看,我正在使用executemany将1000行插入到具有UNIQUE完整性约束的表中; 这个约束事先没有在python中验证过,因为这要么一直要求SELECT(这似乎适得其反)或要求我获得超过3 GB的RAM.所有这一切都说,当我的脚本试图通过捕获psycopg2.DatabaseError来插入已存在的行时,我指望Postgres警告我.
当我的脚本检测到这样的非UNIQUE INSERT时,它的connection.rollback()(每次都会产生1000行,并且使得executemany变得毫无价值),然后逐个INSERT所有值.
由于psycopg2的记录很少(因为有很多很棒的模块......),我找不到一个有效且有效的解决方法.我已经将每个executemany INSERTed的值从1000减少到100,以减少每个executemany非UNIQUE INSERT的可能性,但我很确定他们只是告诉psycopg2忽略这些exece或告诉游标继续executemany.
基本上,这似乎是一种解决方案如此容易和流行的问题,我所能做的就是要求了解它.
再次感谢!
小智 8
只需使用psql\copy命令将所有数据复制到临时表中,或使用psycopg cursor.copy_in()方法.然后:
insert into mytable
select * from (
select distinct *
from scratch
) uniq
where not exists (
select 1
from mytable
where mytable.mykey = uniq.mykey
);
Run Code Online (Sandbox Code Playgroud)
这将比任何插入组合更快地进行重复数据删除和运行.
-DG
我遇到了同样的问题,并在这里搜索了很多天来收集大量的提示,以形成一个完整的解决方案.即使问题已经过时,我希望这对其他人有用.
1)忘记删除索引/约束并稍后重新创建它们的好处,好处是边缘或更糟.
2)executemany比执行更好,因为它为你准备了prepare语句.您可以使用如下命令获得相同的结果,以获得300%的速度:
# To run only once:
sqlCmd = """PREPARE myInsert (int, timestamp, real, text) AS
INSERT INTO myBigTable (idNumber, date_obs, result, user)
SELECT $1, $2, $3, $4 WHERE NOT EXISTS
(SELECT 1 FROM myBigTable WHERE (idNumber, date_obs, user)=($1, $2, $4));"""
curPG.execute(sqlCmd)
cptInsert = 0 # To let you commit from time to time
#... inside the big loop:
curPG.execute("EXECUTE myInsert(%s,%s,%s,%s);", myNewRecord)
allreadyExists = (curPG.rowcount < 1)
if not allreadyExists:
cptInsert += 1
if cptInsert % 10000 == 0:
conPG.commit()
Run Code Online (Sandbox Code Playgroud)
此伪表示例对(idNumber,date_obs,user)具有唯一约束.
3)最好的解决方案是使用COPY_FROM和TRIGGER来管理BEFORE INSERT中的唯一键.这给了我36倍的速度.我开始使用500记录/秒的普通插入.并且通过"复制",我获得了超过18,000条记录/秒.使用Psycopg2的Python示例代码:
ioResult = StringIO.StringIO() #To use a virtual file as a buffer
cptInsert = 0 # To let you commit from time to time - Memory has limitations
#... inside the big loop:
print >> ioResult, "\t".join(map(str, myNewRecord))
cptInsert += 1
if cptInsert % 10000 == 0:
ioResult = flushCopyBuffer(ioResult, curPG)
#... after the loop:
ioResult = flushCopyBuffer(ioResult, curPG)
def flushCopyBuffer(bufferFile, cursorObj):
bufferFile.seek(0) # Little detail where lures the deamon...
cursorObj.copy_from(bufferFile, 'myBigTable',
columns=('idNumber', 'date_obs', 'value', 'user'))
cursorObj.connection.commit()
bufferFile.close()
bufferFile = StringIO.StringIO()
return bufferFile
Run Code Online (Sandbox Code Playgroud)
这就是Python部分.现在Postgresql触发器没有异常psycopg2.IntegrityError然后所有COPY命令的记录被拒绝:
CREATE OR REPLACE FUNCTION chk_exists()
RETURNS trigger AS $BODY$
DECLARE
curRec RECORD;
BEGIN
-- Check if record's key already exists or is empty (file's last line is)
IF NEW.idNumber IS NULL THEN
RETURN NULL;
END IF;
SELECT INTO curRec * FROM myBigTable
WHERE (idNumber, date_obs, user) = (NEW.idNumber, NEW.date_obs, NEW.user);
IF NOT FOUND THEN -- OK keep it
RETURN NEW;
ELSE
RETURN NULL; -- Oups throw it or update the current record
END IF;
END;
$BODY$ LANGUAGE plpgsql;
Run Code Online (Sandbox Code Playgroud)
现在将此函数链接到表的触发器:
CREATE TRIGGER chk_exists_before_insert
BEFORE INSERT ON myBigTable FOR EACH ROW EXECUTE PROCEDURE chk_exists();
Run Code Online (Sandbox Code Playgroud)
这似乎是很多工作,但Postgresql是一个非常快速的野兽,因为它不必一遍又一遍地解释SQL.玩得开心.
“当我的脚本检测到这样的非唯一插入时,它会使用connection.rollback()(每次都会生成多达1000行,并且有点使executemany变得毫无价值),然后一一插入所有值。”
这个问题并没有多大意义。
每个 1,000 行的块是否会因行不唯一而失败?
1,000 行中的 1 个块是否会失败(在 5,000 个这样的块中)?如果是这样,那么执行次数对 5,000 人中的 4,999 人有帮助,远非“毫无价值”。
您是否担心这个非独特的插入物?或者你有关于这种情况发生次数的实际统计吗?
如果您已从 1,000 个行块切换到 100 个行块,那么您显然可以确定 1,000 个行块、100 个行块和 1 个行块是否具有性能优势。
请使用实际数据库和不同大小的块实际运行实际程序并发布数字。