使用SQLAlchemy和多处理挂起Python脚本

Fah*_*tha 7 python postgresql sqlalchemy multiprocessing

请考虑以下Python脚本,该脚本使用SQLAlchemy和Python多处理模块.这是在Debian squeeze上使用Python 2.6.6-8 + b1(默认)和SQLAlchemy 0.6.3-3(默认).这是一些实际代码的简化版本.

import multiprocessing
from sqlalchemy import *
from sqlalchemy.orm import *
dbuser = ...
password = ...
dbname = ...
dbstring = "postgresql://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()
db.dispose()

for i in range(10):
    make_foo(i)

m.create_all()

def do(kwargs):
    i, dbstring = kwargs['i'], kwargs['dbstring']

    db = create_engine(dbstring)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
    Session.commit()
    db.dispose()

pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i, 'dbstring':dbstring})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
r.get()
r.wait()
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

此脚本挂起以下错误消息.

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 259, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 4 arguments (2 given)', <class 'sqlalchemy.exc.ProgrammingError'>, ('(ProgrammingError) syntax error at or near "%"\nLINE 1: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;\n        ^\n',))
Run Code Online (Sandbox Code Playgroud)

当然,这里的语法错误是TRUNCATE foo%s;.我的问题是,为什么这个过程悬而未决,我可以说服它以错误退出,而不对我的代码进行大手术吗?此行为与我的实际代码非常相似.

请注意,如果语句被类似的替换,则不会发生挂起print foobarbaz.此外,如果我们更换,仍然会发生挂起

Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
Session.commit()
db.dispose()
Run Code Online (Sandbox Code Playgroud)

只是 Session.execute("TRUNCATE foo%s;")

我正在使用以前的版本,因为它更接近我的实际代码.

此外,multiprocessing从图片中删除并连续循环遍历表使挂起消失,它只是退出时出错.

我也对错误的形式感到困惑,尤其是错误TypeError: ('__init__() takes at least 4 arguments (2 given)' .这个错误来自哪里?它似乎来自multiprocessing代码中的某个地方.

PostgreSQL日志没有帮助.我看到很多线条

2012-01-09 14:16:34.174 IST [7810] 4f0aa96a.1e82/1 12/583 0 ERROR:  syntax error at or near "%" at character 28
2012-01-09 14:16:34.175 IST [7810] 4f0aa96a.1e82/2 12/583 0 STATEMENT:  COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;
Run Code Online (Sandbox Code Playgroud)

但没有其他似乎相关的东西.

更新1:感谢lbolla和他富有洞察力的分析,我能够提交有关此问题的Python错误报告.请参阅该报告中的sbt分析,以及此处.另请参阅Python错误报告修复异常酸洗.因此,按照sbt的解释,我们可以重现原始错误

import sqlalchemy.exc
e = sqlalchemy.exc.ProgrammingError("", {}, None)
type(e)(*e.args)
Run Code Online (Sandbox Code Playgroud)

这使

Traceback (most recent call last):
  File "<stdin>", line 9, in <module>
TypeError: __init__() takes at least 4 arguments (2 given)
Run Code Online (Sandbox Code Playgroud)

更新2:至少对于SQLAlchemy,Mike Bayer已修复此问题,请参阅错误报告StatementError Exceptions不可选..根据Mike的建议,我也向psycopg2报告了类似的错误,尽管我没有(并且没有)有一个破损的实际例子.无论如何,他们显然已经修复了它,尽管他们没有提供修复的细节.请参阅psycopg异常无法进行pickle.为了更好的衡量,我还报告了一个Python错误ConfigParser异常不是pickleable对应于提到的SO问题lbolla.他们似乎想要对此进行测试.

无论如何,在可预见的未来,这似乎仍将是一个问题,因为总的来说,Python开发人员似乎并不知道这个问题,所以不要防范它.令人惊讶的是,似乎没有足够的人使用多处理这是一个众所周知的问题,或者他们可能只是忍受它.我希望Python开发人员能够至少为Python 3修复它,因为它很烦人.

我接受了lbolla的回答,因为如果不解释问题与异常处理的关系,我可能无法理解这一点.我还要感谢sbt,他解释说Python无法解决异常问题.我非常感谢他们两个,请将他们的答案投票.谢谢.

更新3:我发布了一个后续问题:捕获不可推翻的异常并重新提升.

lbo*_*lla 11

我相信,TypeError来自multiprocessingget.

我已从您的脚本中删除了所有数据库代码.看看这个:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results
Run Code Online (Sandbox Code Playgroud)

使用r.wait返回预期的结果,但使用r.get加注TypeError.正如在python的文档中描述的那样,在使用r.wait之后使用map_async.

编辑:我必须修改我以前的答案.我现在相信TypeError它来自SQLAlchemy.我修改了我的脚本以重现错误.

编辑2:看起来问题是,multiprocessing.pool如果任何工作者引发一个构造函数需要参数的异常,那么它就不能很好地运行(另请参见此处).

我修改了我的脚本以突出显示这一点.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results
Run Code Online (Sandbox Code Playgroud)

在您的情况下,假设您的代码引发了SQLAlchemy异常,我能想到的唯一解决方案是捕获do函数中的所有异常并重新引发正常Exception.像这样的东西:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results
Run Code Online (Sandbox Code Playgroud)

编辑3:所以,它似乎是Python的一个错误,但SQLAlchemy中的正确异常会解决它:因此,我也提出了SQLAlchemy的问题.

作为解决问题的方法,我认为Edit 2末尾的解决方案会做(在try-except和re-raise中包装回调).