如何在没有全局变量的芹菜任务中设置sqlalchemy会话

Clé*_*ner 8 python sqlalchemy celery

简介:我想在celery任务中使用sqlalchemy会话,而不需要包含该会话的全局变量.

我在一个芹菜任务的项目中使用SQLAlchemy,而我正在使用它

Currently, I have a global variable 'session' defined along with my celery app setup (celery.py), with a worker signal to set it up.

session = scoped_session(sessionmaker())

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    # load the application configuration
    # db_uri = conf['db_uri']
    engine = create_engine(db_uri)
    session.configure(bind=engine)
Run Code Online (Sandbox Code Playgroud)

In the module defining the tasks, I simply import 'session' and use it. Tasks are defined with a custom class that closes the session after returning:

class DBTask(Task):
    def after_return(self, *args, **kwargs):
        session.remove()
Run Code Online (Sandbox Code Playgroud)

That works well, however: when unit testing with CELERY_ALWAYS_EAGER=True, the session won't be configured. The only solution I've found so far is to mock that 'session' variable when running a task in a unit test:

with mock.patch('celerymodule.tasks.session', self.session):
    do_something.delay(...)
Run Code Online (Sandbox Code Playgroud)

While it works, I don't want to do that.

Is there any way to setup a session that will no be a global variable, that will work both for normal asynchronous behavior and without workers with CELERY_ALWAYS_EAGER=True?

Clé*_*ner 12

在关于自定义任务类的官方文档中,答案就在我的鼻子底下.

我修改了用于访问数据库的任务的自定义任务类:

class DBTask(Task):
    _session = None

    def after_return(self, *args, **kwargs):
        if self._session is not None:
            self._session.remove()

    @property
    def session(self):
        if self._session is None:
            _, self._session = _get_engine_session(self.conf['db_uri'],
                                                   verbose=False)

        return self._session
Run Code Online (Sandbox Code Playgroud)

我这样定义我的任务:

@app.task(base=DBTask, bind=True)
def do_stuff_with_db(self, conf, some_arg):
    self.conf = conf
    thing = self.session.query(Thing).filter_by(arg=some_arg).first()
Run Code Online (Sandbox Code Playgroud)

这样,SQLAlchemy会话只会为每个芹菜工作进程创建一次,而且我不需要任何全局变量.

这解决了我的单元测试的问题,因为SQLAlchemy会话设置现在独立于芹菜工作者.

  • 对于正在查看并考虑将其用于数据库连接的其他人,而不仅仅是会话,请注意,您不希望在每个任务结束时调用 `after_return()`... session,而不是全连接。 (4认同)
  • @AmitTalmor celery bind=True 参数将 `self` 作为第一个参数作为任务的实例传递。`self.conf` 继承自 `Task`,所以它已经在那里定义了 (2认同)