使用带有嵌套查询的python MySQLDB SScursor

shi*_*eta 6 python mysql-python

当生成大型结果集时,典型的MySQLdb库查询可能会占用大量内存并在Python中表现不佳.例如:

cursor.execute("SELECT id, name FROM `table`")
for i in xrange(cursor.rowcount):
    id, name = cursor.fetchone()
    print id, name
Run Code Online (Sandbox Code Playgroud)

有一个可选的游标,一次只能获取一行,真正加快了脚本的速度,并大大减少了脚本的内存占用.

import MySQLdb
import MySQLdb.cursors

conn = MySQLdb.connect(user="user", passwd="password", db="dbname", 
                       cursorclass = MySQLdb.cursors.SSCursor)
cur = conn.cursor()
cur.execute("SELECT id, name FROM users")
row = cur.fetchone()
while row is not None:
    doSomething()
    row = cur.fetchone()    
cur.close()
conn.close()
Run Code Online (Sandbox Code Playgroud)

但我找不到SSCursor与嵌套查询一起使用的任何内容.如果这是定义doSomething():

def doSomething()
    cur2 = conn.cursor()
    cur2.execute('select id,x,y from table2')
    rows = cur2.fetchall()
    for row in rows:
        doSomethingElse(row)
    cur2.close()
Run Code Online (Sandbox Code Playgroud)

然后脚本抛出以下错误:

_mysql_exceptions.ProgrammingError: (2014, "Commands out of sync; you can't run this command now")
Run Code Online (Sandbox Code Playgroud)

听起来好像SSCursor与嵌套查询不兼容.真的吗?如果是这样太糟糕了,因为主循环似乎用标准光标运行得太慢了.

Air*_*Air 8

这个问题在讨论的MySQLdb的用户指南中的一点,标题下threadsafety属性(重点煤矿):

MySQL协议无法一次使用同一连接处理多个线程.一些早期版本的MySQLdb的所利用的锁定,实现2. threadsafety虽然这不是太难使用标准Cursor类(使用来完成 mysql_store_result()),它是由SSCursor(使用复杂 mysql_use_result(); 而后者则必须确保所有的行在执行另一个查询之前已经读过.

MySLQ C API函数的文档mysql_use_result()提供了有关错误消息的更多信息:

使用时mysql_use_result(),必须执行mysql_fetch_row() 直到NULL返回值,否则,未提取的行将作为下一个查询的结果集的一部分返回.Commands out of sync; you can't run this command now 如果您忘记这样做,C API会给出错误!

换句话说,你必须完全获取结果从任何缓冲光标设定(即,一个使用mysql_use_result()来代替mysql_store_result()-用MySQLdb的,这意味着SSCursorSSDictCursor),才能在同一个连接上执行另一个语句.

在您的情况下,最直接的解决方案是在迭代未缓冲查询的结果集时打开第二个连接.(从同一连接中简单地获取缓冲光标是不行的;在使用缓冲光标之前,您仍然需要超过无缓冲的结果集.)

如果您的工作流程类似于"遍历大结果集,对每行执行N个小查询",请考虑将MySQL的存储过程作为嵌套来自不同连接的游标的替代方法.您仍然可以使用MySQLdb来调用该过程并获得结果,但您肯定希望阅读MySQLdb callproc()方法的文档,因为它在检索过程输出时不符合Python的数据库API规范.


第二种方法是坚持使用缓冲光标,但将查询拆分为批量.这就是我去年为一个项目做的事情,我需要遍历一组数百万行,用内部模块解析一些数据,并在处理每一行后执行一些INSERTUPDATE查询.一般的想法看起来像这样:

QUERY = r"SELECT id, name FROM `table` WHERE id BETWEEN %s and %s;"
BATCH_SIZE = 5000

i = 0
while True:
    cursor.execute(QUERY, (i + 1, i + BATCH_SIZE))
    result = cursor.fetchall()

    # If there's no possibility of a gap as large as BATCH_SIZE in your table ids,
    # you can test to break out of the loop like this (otherwise, adjust accordingly):
    if not result:
        break

    for row in result:
        doSomething()

    i += BATCH_SIZE
Run Code Online (Sandbox Code Playgroud)

关于您的示例代码,我要注意的另一件事是您可以直接在MySQLdb中的游标上迭代而不是fetchone()显式调用xrange(cursor.rowcount).这在使用无缓冲游标时尤其重要,因为该rowcount属性未定义并且会产生非常意外的结果(请参阅:使用带有SSDictCursor的cursor.rowcount的Python MysqlDB返回错误计数).