psycopg2:使用一个查询插入多行

Ser*_*eev 121 python postgresql psycopg2

我需要用一个查询插入多行(行数不是常量),所以我需要像这样执行查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);
Run Code Online (Sandbox Code Playgroud)

我知道的唯一方法是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)
Run Code Online (Sandbox Code Playgroud)

但我想要一些更简单的方法.

小智 202

我构建了一个程序,可以将多行插入到位于另一个城市的服务器上.

我发现使用这种方法的速度比快10倍executemany.在我的例子中tup是一个包含大约2000行的元组.使用此方法花了大约10秒钟:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 
Run Code Online (Sandbox Code Playgroud)

使用此方法时2分钟:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)
Run Code Online (Sandbox Code Playgroud)

  • 如果有人遇到以下错误:[TypeError:序列项0:预期的str实例,找到的字节]改为运行此命令[args_str =','.join(cur.mogrify("(%s,%s)",x ).decode("utf-8")for t in tup)] (15认同)
  • 两年后仍然非常相关.今天的经验表明,随着您想要推送的行数增加,使用`execute`策略的效果越好.由于这个原因,我看到了大约100倍的加速! (11认同)
  • 也许`executemany`在每次插入后运行一次提交.如果你把整个东西包裹在一个交易中,那么这可能会加快速度吗? (4认同)
  • 刚刚确认了这一改进.从我所读到的psycopg2的`executemany`没有做任何最优化的事情,只是循环并执行许多`execute`语句.使用此方法,远程服务器的700行插入从60秒到<2秒. (4认同)
  • 也许我是偏执狂,但是用`+`连接查询似乎可以打开sql注入,我觉得@Clodoaldo Neto`execute_values()`解决方案更安全. (3认同)
  • 插入件的批处理准备真的非常快,非常感谢此提示。要添加:我有一个','join的值错误,因为mogrify返回的是字节而不是字符串(并且我首先将mogrifiy的结果存储在列表中)。我对迁移结果使用了.decode,这解决了问题。 (2认同)

Clo*_*eto 126

Psycopg 2.7中的新execute_values方法:

data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
    cursor, insert_query, data, template=None, page_size=100
)
Run Code Online (Sandbox Code Playgroud)

在Psycopg 2.6中进行pythonic方式:

data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
cursor.execute(insert_query, data)
Run Code Online (Sandbox Code Playgroud)

说明:如果要插入的数据是作为元组列表给出的,例如in

data = [(1,'x'), (2,'y')]
Run Code Online (Sandbox Code Playgroud)

然后它已经是完全所需的格式

  1. 该子句的values语法insert需要一个记录列表,如

    insert into t (a, b) values (1, 'x'),(2, 'y')

  2. Psycopg使Python适应tuplePostgresql record.

唯一必要的工作是提供一个由psycopg填充的记录列表模板

# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))
Run Code Online (Sandbox Code Playgroud)

并将其放在insert查询中

insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
Run Code Online (Sandbox Code Playgroud)

打印insert_query输出

insert into t (a, b) values %s,%s
Run Code Online (Sandbox Code Playgroud)

现在以通常的Psycopg论据替代

cursor.execute(insert_query, data)
Run Code Online (Sandbox Code Playgroud)

或者只是测试将发送到服务器的内容

print (cursor.mogrify(insert_query, data).decode('utf8'))
Run Code Online (Sandbox Code Playgroud)

输出:

insert into t (a, b) values (1, 'x'),(2, 'y')
Run Code Online (Sandbox Code Playgroud)

  • [这是带有基准的要点](https://gist.github.com/jsheedy/efa9a69926a754bebf0e9078fd085df6)。在我的拥有 10M 条记录的机器上,copy_from 的速度提高了约 6.5 倍。 (3认同)
  • 该方法的性能与 cur.copy_from 相比如何? (2认同)
  • 使用`execute_values`我能够使我的系统以每分钟1k条记录的速度运行到每分钟128k条记录 (2认同)
  • @Phillipp 这对于每个执行语句来说都是正常的,除非您处于自动提交模式。 (2认同)

Ant*_*aux 53

使用psycopg2 2.7更新:

经典executemany()比@ ant32的实现(称为"折叠")慢大约60倍,如本主题所述:https://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

这个实现被添加到版本2.7中的psycopg2并被调用execute_values():

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])
Run Code Online (Sandbox Code Playgroud)

上一个答案:

要插入多行,使用multirow VALUES语法execute()比使用psycopg2快约10倍executemany().实际上,executemany()只需运行许多个别INSERT陈述.

@ ant32的代码在Python 2中完美运行.但在Python 3中,cursor.mogrify()返回字节,cursor.execute()接受字节或字符串,并','.join()期望str实例.

所以在Python 3中你可能需要修改@ ant32的代码,方法是添加.decode('utf-8'):

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)
Run Code Online (Sandbox Code Playgroud)

或者仅使用字节(带b''b""):

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 
Run Code Online (Sandbox Code Playgroud)


ptr*_*trn 24

来自Postgresql.org的 Psycopg2教程页面的片段(见下):

我想向您展示的最后一项是如何使用字典插入多行.如果您有以下内容:

namedict = ({"first_name":"Joshua", "last_name":"Drake"},
            {"first_name":"Steven", "last_name":"Foo"},
            {"first_name":"David", "last_name":"Bar"})
Run Code Online (Sandbox Code Playgroud)

您可以使用以下命令轻松地在字典中插入所有三行:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)
Run Code Online (Sandbox Code Playgroud)

它不会节省太多代码,但它确实看起来更好.

  • 这将运行许多单独的`INSERT`语句.有用,但与单个multi-VALAL`d插入不同. (30认同)
  • 在同一个文档中,写有 cur.executemany 语句将自动迭代字典并为每一行执行 INSERT 查询。 (2认同)

Jos*_*edy 23

到目前为止,cursor.copy_from是我发现的批量插入的最快解决方案.这是我制作的包含名为IteratorFile的类的要点,它允许迭代器产生的字符串像文件一样被读取.我们可以使用生成器表达式将每个输入记录转换为字符串.所以解决方案就是

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))
Run Code Online (Sandbox Code Playgroud)

对于这个微不足道的args大小,它不会产生很大的速度差异,但是当我处理成千上万的行时,我看到了大的加速.它也比构建一个巨大的查询字符串更有效.迭代器一次只能在内存中保存一条输入记录,在某些时候,通过构建查询字符串,您将在Python进程或Postgres中耗尽内存.

  • [这是一个基准](https://gist.github.com/jsheedy/efa9a69926a754bebf0e9078fd085df6)将copy_from/IteratorFile与查询构建器解决方案进行比较.copy_from在具有10M记录的机器上可以快速扩展到约6.5倍. (3认同)
  • 你是否需要绕过逃脱的字符串和时间戳等? (3认同)

小智 12

安全漏洞

截至 2022 年 11 月 16 日,@Clodoaldo Neto(针对 Psycopg 2.6)、@Joseph Sheedy、@JJ、@Bart Jonk、@kevo Njoki、@TKoutny 和 @Nihal Sharma 的答案包含SQL 注入漏洞,不应使用。

迄今为止最快的提案 ( copy_from) 也不应该使用,因为很难正确转义数据。当尝试插入'"\n\\t或 等字符时,这一点很明显\n

psycopg2 的作者还建议不要copy_from

copy_from() 和 copy_to() 实际上只是古老且不完整的方法

最快的方法

最快的方法是cursor.copy_expert,它可以直接从 CSV 文件插入数据。

with open("mydata.csv") as f:
    cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)
Run Code Online (Sandbox Code Playgroud)

copy_expert这也是即时生成 CSV 文件的最快方法。作为参考,请参阅以下CSVFile类,该类会注意限制内存使用。

import io, csv

class CSVFile(io.TextIOBase):
    # Create a CSV file from rows. Can only be read once.
    def __init__(self, rows, size=8192):
        self.row_iter = iter(rows)
        self.buf = io.StringIO()
        self.available = 0
        self.size = size

    def read(self, n):
        # Buffer new CSV rows until enough data is available
        buf = self.buf
        writer = csv.writer(buf)
        while self.available < n:
            try:
                row_length = writer.writerow(next(self.row_iter))
                self.available += row_length
                self.size = max(self.size, row_length)
            except StopIteration:
                break

        # Read requested amount of data from buffer
        write_pos = buf.tell()
        read_pos = write_pos - self.available
        buf.seek(read_pos)
        data = buf.read(n)
        self.available -= len(data)

        # Shrink buffer if it grew very large
        if read_pos > 2 * self.size:
            remaining = buf.read()
            buf.seek(0)
            buf.write(remaining)
            buf.truncate()
        else:
            buf.seek(write_pos)

        return data
Run Code Online (Sandbox Code Playgroud)

然后可以像这样使用该类:

rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))
Run Code Online (Sandbox Code Playgroud)

如果您的所有数据都适合内存,您也可以直接生成整个 CSV 数据而无需类CSVFile,但如果您不知道将来要插入多少数据,您可能不应该这样做。

f = io.StringIO()
writer = csv.writer(f)
for row in rows:
    writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)
Run Code Online (Sandbox Code Playgroud)

基准测试结果

  • 914 毫秒 - 多次调用cursor.execute
  • 846 毫秒 -cursor.executemany
  • 362 毫秒 -psycopg2.extras.execute_batch
  • 346 毫秒 -execute_batchpage_size=1000
  • 265 毫秒 -execute_batch带准备好的语句
  • 161 毫秒 -psycopg2.extras.execute_values
  • 127 毫秒 -cursor.execute带字符串连接值
  • 39 毫秒 -copy_expert一次生成整个 CSV 文件
  • 32 毫秒 -copy_expertCSVFile


J.J*_*J.J 6

所有这些技术在Postgres术语中称为"扩展插入",截至2016年11月24日,它仍然比psychopg2的executemany()和此线程中列出的所有其他方法(我在此之前尝试过)快一点回答).

这里有一些代码没有使用cur.mogrify,而且很简单,只需要了解一下:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()
Run Code Online (Sandbox Code Playgroud)

但应该注意的是,如果你可以使用copy_from(),你应该使用copy_from;)