我正在部署一个应用程序来使用一些 .csv 数据。我想将它们复制到 MySQL 表中。在 stackoverflow 用户的帮助下,我编写了以下代码:
import csv
import MySQLdb
db = MySQLdb.connect( host = "dbname.description.host.com",
user = "user",
passwd = "key",
db = "dbname")
cursor = db.cursor()
query = 'INSERT INTO table_name(column,column_1,column_2,column_3)
VALUES(%s, %s, %s, %s)'
csv_data = csv.reader(file('file_name'))
for row in csv_data:
cursor.execute(query,row)
db.commit()
cursor.close()
Run Code Online (Sandbox Code Playgroud)
问题是,目前这个过程太慢了,我需要加快速度。
谢谢
Mik*_*ung 16
您可以使用executemany如下批处理作业
import csv
import MySQLdb
db = MySQLdb.connect( host = "dbname.description.host.com",
user = "user",
passwd = "key",
db = "dbname")
cursor = db.cursor()
query = 'INSERT INTO table_name(column,column_1,column_2,column_3)
VALUES(%s, %s, %s, %s)'
csv_data = csv.reader(file('file_name'))
my_data = []
for row in csv_data:
my_data.append(tuple(row))
cursor.executemany(query, my_data)
cursor.close()
Run Code Online (Sandbox Code Playgroud)
由于多种原因,您正在使用的代码效率极低,因为您一次提交一行数据(这将是您想要的事务数据库或进程),而不是一次性转储。
有很多方法可以加快速度,从很好到不太好。这里有 4 种方法,包括幼稚的实现(上图)
#!/usr/bin/env python
import pandas as pd
import numpy as np
import odo
import profilehooks
import sqlalchemy
import csv
import os
def create_test_data():
n = 100000
df = pd.DataFrame(dict(
id=np.random.randint(0, 1000000, n),
col1=np.random.choice(['hello', 'world', 'python', 'large string for testing ' * 10], n),
col2=np.random.randint(-1000000, 1000000, n),
col3=np.random.randint(-9000000, 9000000, n),
col4=(np.random.random(n) - 0.5) * 99999
), columns=['id', 'col1', 'col2', 'col3', 'col4'])
df.to_csv('tmp.csv', index=False)
@profilehooks.timecall
def using_pandas(table_name, uri):
df = pd.read_csv('tmp.csv')
df.to_sql(table_name, con=uri, if_exists='append', index=False)
@profilehooks.timecall
def using_odo(table_name, uri):
odo.odo('tmp.csv', '%s::%s' % (uri, table_name))
@profilehooks.timecall
def using_cursor(table_name, uri):
engine = sqlalchemy.create_engine(uri)
query = 'INSERT INTO {} (id, col1, col2, col3, col4) VALUES(%s, %s, %s, %s, %s)'
query = query.format(table_name)
con = engine.raw_connection()
with con.cursor() as cursor:
with open('tmp.csv') as fh:
reader = csv.reader(fh)
next(reader) # Skip firt line (headers)
for row in reader:
cursor.execute(query, row)
con.commit()
con.close()
@profilehooks.timecall
def using_cursor_correct(table_name, uri):
engine = sqlalchemy.create_engine(uri)
query = 'INSERT INTO {} (id, col1, col2, col3, col4) VALUES(%s, %s, %s, %s, %s)'
query = query.format(table_name)
with open('tmp.csv') as fh:
reader = csv.reader(fh)
next(reader) # Skip firt line (headers)
data = list(reader)
engine.execute(query, data)
def main():
uri = 'mysql+pymysql://root:%s@localhost/test' % os.environ['pass']
engine = sqlalchemy.create_engine(uri)
for i in (1, 2, 3, 4):
engine.execute("DROP TABLE IF EXISTS table%s" % i)
engine.execute("""
CREATE TABLE table%s(
id INT,
col1 VARCHAR(255),
col2 INT,
col3 INT,
col4 DOUBLE
);
""" % i)
create_test_data()
using_odo('table1', uri)
using_pandas('table4', uri)
using_cursor_correct('table3', uri)
using_cursor('table2', uri)
for i in (1, 2, 3, 4):
count = pd.read_sql('SELECT COUNT(*) as c FROM table%s' % i, con=uri)['c'][0]
print("Count for table%s - %s" % (i, count))
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
odo 方法是最快的(在幕后使用 MySQL LOAD DATA INFILE)其次是 Pandas(优化了关键代码路径)其次是使用原始游标但批量插入行最后是天真的方法,一次提交一行
以下是针对本地 MySQL 服务器在本地运行的一些示例计时。
using_odo (./test.py:29): 0.516 秒
using_pandas (./test.py:23):3.039 秒
using_cursor_correct (./test.py:50): 12.847 秒
using_cursor (./test.py:34): 43.470 秒
表 1 的计数 - 100000
表 2 的计数 - 100000
表 3 的计数 - 100000
表 4 的计数 - 100000
如您所见,简单的实现比 odo 慢了大约 100 倍。并且比使用熊猫慢 10 倍
将提交取出:
for row in csv_data:
cursor.execute(query,row)
db.commit()
Run Code Online (Sandbox Code Playgroud)
它将完成更少的工作并且速度更快
| 归档时间: |
|
| 查看次数: |
20165 次 |
| 最近记录: |