为每个芹菜工人创建单独的数据库连接

Ven*_*tra 7 python mysql sqlalchemy celery django-celery

当工作人员在创建之后执行任务时,我一直在遇到奇怪的mysql问题.

我们使用django 1.3,芹菜3.1.17,djorm-ext-pool 0.5

我们用并发启动芹菜进程3.到目前为止我的观察是,当工作进程启动时,它们都得到相同的mysql连接.我们记录数据库连接ID,如下所示.

from django.db import connection
connection.cursor()
logger.info("Task %s processing with db connection %s", str(task_id), str(connection.connection.thread_id()))
Run Code Online (Sandbox Code Playgroud)

当所有工作人员获得任务时,第一个工作成功执行,但另外两个工作人员发出奇怪的Mysql错误.它或者是"Mysql服务器消失"的错误,或者是Django抛出"DoesNotExist"错误的情况.显然Django查询的对象确实存在.

在此错误之后,每个工作程序开始获得自己的数据库连接,之后我们没有发现任何问题.

芹菜的默认行为是什么?它是否旨在共享相同的数据库连接.如果是这样,如何处理进程间通信?理想情况下,我希望每个工作者都有不同的数据库连接

我尝试了下面链接中提到的代码,但是没有用. 芹菜工人数据库连接池

我们还修复了下面建议的芹菜代码. https://github.com/celery/celery/issues/2453

对于那些提出问题的人,请让我知道downvote的原因.

Ven*_*tra 4

Celery 通过以下命令启动

celery -A myproject worker --loglevel=debug --concurrency=3 -Q testqueue
Run Code Online (Sandbox Code Playgroud)

myproject.py作为主进程的一部分,在分叉工作进程之前对 mysql 数据库进行一些查询。

作为主进程中查询流的一部分,django ORM 会创建一个 sqlalchemy 连接池(如果它尚不存在)。然后创建工作进程。

Celery 作为 django 修复程序的一部分关闭现有连接。

    def close_database(self, **kwargs):
    if self._close_old_connections:
        return self._close_old_connections()  # Django 1.6
    if not self.db_reuse_max:
        return self._close_database()
    if self._db_recycles >= self.db_reuse_max * 2:
        self._db_recycles = 0
        self._close_database()
    self._db_recycles += 1
Run Code Online (Sandbox Code Playgroud)

实际上,可能发生的情况是,具有一个未使用的数据库连接的 sqlalchemy 池对象在分叉时被复制到第 3 个工作进程。因此,3 个不同的池有 3 个连接对象指向相同的连接文件描述符。

当工作人员在执行任务时要求数据库连接时,所有工作人员都会从 sqlalchemy 池中获取相同的未使用连接,因为该连接当前未使用。所有连接都指向同一个文件描述符这一事实导致了 MySQL 连接消失错误。

之后创建的新连接都是新的,并且不指向相同的套接字文件描述符。

解决方案:

在主流程中添加

from django.db import connection
connection.cursor()
Run Code Online (Sandbox Code Playgroud)

在完成任何导入之前。即在djorm-ext-pool添加模块之前。

这样,所有数据库查询都将使用 django 在池外创建的连接。当 celery django fixup 关闭连接时,连接实际上被关闭,而不是返回到 alchemy 池,在分叉时复制到所有工作线程时,alchemy 池中没有任何连接。此后,当工作人员请求数据库连接时,sqlalchemy 会返回新创建的连接之一。