当SQLAlchemy事件触发Celery任务时,将关闭连接

deB*_*ice 7 python unit-testing sqlalchemy celery flask

当我的一个单元测试删除SQLAlchemy对象时,该对象会触发after_delete事件,该事件触发Celery任务从驱动器中删除文件.

任务是CELERY_ALWAYS_EAGER = True在测试时.

要轻松重现这个问题

该示例有两个测试.一个触发事件中的任务,另一个触发事件.只有事件中的一个关闭连接.

要快速重现错误,您可以运行:

git clone https://gist.github.com/5762792fc1d628843697.git
cd 5762792fc1d628843697
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python test.py
Run Code Online (Sandbox Code Playgroud)

堆栈:

$     python test.py
E
======================================================================
ERROR: test_delete_task (__main__.CeleryTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 73, in test_delete_task
    db.session.commit()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do
    return getattr(self.registry(), name)(*args, **kwargs)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 776, in commit
    self.transaction.commit()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 377, in commit
    self._prepare_impl()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 357, in _prepare_impl
    self.session.flush()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush
    self._flush(objects)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__
    compat.reraise(type_, value, traceback)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 393, in rollback
    self._assert_active(prepared_ok=True, rollback_ok=True)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_active
    raise sa_exc.ResourceClosedError(closed_msg)
ResourceClosedError: This transaction is closed

----------------------------------------------------------------------
Ran 1 test in 0.014s

FAILED (errors=1)
Run Code Online (Sandbox Code Playgroud)

Rac*_*ers 8

我想我发现了问题 - 这就是你如何设置你的Celery任务.如果从芹菜设置中删除应用程序上下文调用,一切运行正常:

class ContextTask(TaskBase):
    abstract = True

    def __call__(self, *args, **kwargs):
        # deleted --> with app.app_context():
        return TaskBase.__call__(self, *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)

SQLAlchemy文档中有一个很大的警告,即在after_delete事件期间永远不会修改会话:http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete

所以我怀疑with app.app_context():在删除过程中调用了它,试图附加和/或修改Flask-SQLAlchemy存储在app对象中的会话,因此整个事情就是轰炸.

Flask-SQlAlchemy为你做了很多魔术,但是你可以绕过它并直接使用SQLAlchemy.如果在删除事件期间需要与数据库通信,则可以创建到db的新会话:

@celery.task()
def my_task():
    # obviously here I create a new object
    session = db.create_scoped_session()
    session.add(User(id=13, value="random string"))
    session.commit()
    return
Run Code Online (Sandbox Code Playgroud)

但听起来你不需要这个,你只是想删除一个图像路径.在这种情况下,我只会更改您的任务,因此需要一条路径:

# instance will call the task
@event.listens_for(User, "after_delete")
def after_delete(mapper, connection, target):
    my_task.delay(target.value)

@celery.task()
def my_task(image_path):
    os.remove(image_path) 
Run Code Online (Sandbox Code Playgroud)

希望这有用 - 如果有任何不适合你,请告诉我.感谢非常详细的设置,它确实有助于调试.