使用 Python 加速(批量)插入 MySQL

Ped*_*ros 8 python mysql csv

我正在部署一个应用程序来使用一些 .csv 数据。我想将它们复制到 MySQL 表中。在 stackoverflow 用户的帮助下,我编写了以下代码:

import csv
import MySQLdb

db = MySQLdb.connect(   host = "dbname.description.host.com",
                        user = "user",
                        passwd = "key",
                        db = "dbname")
cursor = db.cursor()

query = 'INSERT INTO table_name(column,column_1,column_2,column_3)
VALUES(%s, %s, %s, %s)'                                                         

csv_data = csv.reader(file('file_name'))

for row in csv_data:
     cursor.execute(query,row)
     db.commit()

cursor.close()
Run Code Online (Sandbox Code Playgroud)

问题是,目前这个过程太慢了,我需要加快速度。

谢谢

Mik*_*ung 16

您可以使用executemany如下批处理作业

import csv
import MySQLdb

db = MySQLdb.connect(   host = "dbname.description.host.com",
                        user = "user",
                        passwd = "key",
                        db = "dbname")
cursor = db.cursor()

query = 'INSERT INTO table_name(column,column_1,column_2,column_3)
VALUES(%s, %s, %s, %s)'                                                         

csv_data = csv.reader(file('file_name'))

my_data = []
for row in csv_data:
     my_data.append(tuple(row))

cursor.executemany(query, my_data)
cursor.close()
Run Code Online (Sandbox Code Playgroud)

  • executemany 是批处理操作还是逐个遍历整个列表并插入它们?基本上是 executemany = 循环的 execute() ? (3认同)
  • @shreeshkatti大多数时候,它是一个循环,[插入除外](https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlcursor-executemany.html)已优化。 (2认同)

Rog*_*mas 7

由于多种原因,您正在使用的代码效率极低,因为您一次提交一行数据(这将是您想要的事务数据库或进程),而不是一次性转储。

有很多方法可以加快速度,从很好到不太好。这里有 4 种方法,包括幼稚的实现(上图)

#!/usr/bin/env python
import pandas as pd
import numpy as np
import odo
import profilehooks
import sqlalchemy
import csv
import os


def create_test_data():
    n = 100000
    df = pd.DataFrame(dict(
        id=np.random.randint(0, 1000000, n),
        col1=np.random.choice(['hello', 'world', 'python', 'large string for testing ' * 10], n),
        col2=np.random.randint(-1000000, 1000000, n),
        col3=np.random.randint(-9000000, 9000000, n),
        col4=(np.random.random(n) - 0.5) * 99999
    ), columns=['id', 'col1', 'col2', 'col3', 'col4'])
    df.to_csv('tmp.csv', index=False)


@profilehooks.timecall
def using_pandas(table_name, uri):
    df = pd.read_csv('tmp.csv')
    df.to_sql(table_name, con=uri, if_exists='append', index=False)


@profilehooks.timecall
def using_odo(table_name, uri):
    odo.odo('tmp.csv', '%s::%s' % (uri, table_name))


@profilehooks.timecall
def using_cursor(table_name, uri):
    engine = sqlalchemy.create_engine(uri)
    query = 'INSERT INTO {} (id, col1, col2, col3, col4) VALUES(%s, %s, %s, %s, %s)'
    query = query.format(table_name)
    con = engine.raw_connection()
    with con.cursor() as cursor:
        with open('tmp.csv') as fh:
            reader = csv.reader(fh)
            next(reader)  # Skip firt line (headers)
            for row in reader:
                cursor.execute(query, row)
                con.commit()
    con.close()


@profilehooks.timecall
def using_cursor_correct(table_name, uri):
    engine = sqlalchemy.create_engine(uri)
    query = 'INSERT INTO {} (id, col1, col2, col3, col4) VALUES(%s, %s, %s, %s, %s)'
    query = query.format(table_name)
    with open('tmp.csv') as fh:
        reader = csv.reader(fh)
        next(reader)  # Skip firt line (headers)
        data = list(reader)
    engine.execute(query, data)


def main():
    uri = 'mysql+pymysql://root:%s@localhost/test' % os.environ['pass']

    engine = sqlalchemy.create_engine(uri)
    for i in (1, 2, 3, 4):
        engine.execute("DROP TABLE IF EXISTS table%s" % i)
        engine.execute("""
            CREATE TABLE table%s(
                id INT,
                col1 VARCHAR(255),
                col2 INT,
                col3 INT,
                col4 DOUBLE
            );
        """ % i)
    create_test_data()

    using_odo('table1', uri)
    using_pandas('table4', uri)
    using_cursor_correct('table3', uri)
    using_cursor('table2', uri)

    for i in (1, 2, 3, 4):
        count = pd.read_sql('SELECT COUNT(*) as c FROM table%s' % i, con=uri)['c'][0]
        print("Count for table%s - %s" % (i, count))


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

odo 方法是最快的(在幕后使用 MySQL LOAD DATA INFILE)其次是 Pandas(优化了关键代码路径)其次是使用原始游标但批量插入行最后是天真的方法,一次提交一行

以下是针对本地 MySQL 服务器在本地运行的一些示例计时。

using_odo (./test.py:29): 0.516 秒

using_pandas (./test.py:23):3.039 秒

using_cursor_correct (./test.py:50): 12.847 秒

using_cursor (./test.py:34): 43.470 秒

表 1 的计数 - 100000

表 2 的计数 - 100000

表 3 的计数 - 100000

表 4 的计数 - 100000

如您所见,简单的实现比 odo 慢了大约 100 倍。并且比使用熊猫慢 10 倍


nac*_*cho 1

将提交取出:

for row in csv_data:
     cursor.execute(query,row)
db.commit()
Run Code Online (Sandbox Code Playgroud)

它将完成更少的工作并且速度更快