bli*_*say 8 python sql copy psycopg2 pipe
我正在编写一个脚本,使用psycopg2在同一网络上的两台机器之间复制一些数据.我正在用一些旧的,丑陋的bash替换副本
psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN"
Run Code Online (Sandbox Code Playgroud)
这似乎是最简单,最有效的复制方式.使用stringIO或临时文件在python中复制很容易,如下所示:
buf = StringIO()
from_curs = from_conn.cursor()
to_curs = to_conn.cursor()
from_curs.copy_expert("COPY table TO STDOUT", buf)
buf.seek(0, os.SEEK_SET)
to_curs.copy_expert("COPY table FROM STDIN", buf)
Run Code Online (Sandbox Code Playgroud)
...但这涉及将所有数据保存到磁盘/内存中.
有没有人想出一种方法来模仿像这样的副本中的Unix管道的行为?我似乎无法找到一个不涉及POpen的unix-pipe对象 - 也许最好的解决方案就是使用POpen和subprocess,毕竟.
Ary*_*rog 13
您必须将其中一个调用放在一个单独的线程中.我刚刚意识到你可以使用 os.pipe(),这使得其余的非常简单:
#!/usr/bin/python
import psycopg2
import os
import threading
fromdb = psycopg2.connect("dbname=from_db")
todb = psycopg2.connect("dbname=to_db")
r_fd, w_fd = os.pipe()
def copy_from():
cur = todb.cursor()
cur.copy_from(os.fdopen(r_fd), 'table')
cur.close()
todb.commit()
to_thread = threading.Thread(target=copy_from)
to_thread.start()
cur = fromdb.cursor()
write_f = os.fdopen(w_fd, 'w')
cur.copy_to(write_f, 'table')
write_f.close() # or deadlock...
to_thread.join()
Run Code Online (Sandbox Code Playgroud)
您可以使用已子类化的双端队列来支持读取和写入:
from collections import deque
from Exceptions import IndexError
class DequeBuffer(deque):
def write(self, data):
self.append(data)
def read(self):
try:
return self.popleft()
except IndexError:
return ''
buf = DequeBuffer()
Run Code Online (Sandbox Code Playgroud)
如果读取器比写入器快得多,并且表很大,那么它deque仍然会变大,但它会比存储整个数据要小。
另外,我不确定return ''何时为deque空是安全的,而不是重试直到它不为空,但我猜它是安全的。让我知道它是否有效。
请记住,del buf当您确定复制已完成时,尤其是当脚本此时不只是退出时。