使用原始 PyMySQL 为 celery 进行多线程处理

jul*_*sne 6 python mysql celery flask pymysql

在我目前正在进行的项目中,我不允许使用 ORM,所以我自己

它工作得很好,但我在使用 Celery 时遇到了问题,而且它是并发性的。有一段时间,我将它设置为1(使用--concurrency=1),但我正在添加新任务,这些任务需要更多的时间来处理,而不是使用 celery beat 运行,这会导致大量的任务积压。

当我将 celery 的并发设置为 > 1 时,会发生以下情况(pastebin 因为它很大):

https://pastebin.com/M4HZXTDC

关于如何在其他进程上实现某种锁定/等待以便不同的工作人员不会相互交叉的任何想法?

编辑:这是我设置PyMySQL 实例以及如何处理打开和关闭的地方

syt*_*ech 1

PyMSQL不允许线程共享同一个连接(模块可以共享,但线程不能共享连接)。您的模型类在各处重用相同的连接

因此,当不同的工作人员调用模型进行查询时,他们使用相同的连接对象,从而导致冲突。

确保您的连接对象是线程本地的。不要使用db类属性,而是考虑一种检索线程本地连接对象的方法,而不是重用可能在不同线程中创建的连接对象。

例如,在任务中创建连接

现在,您在每个模型的任何地方都使用全局连接。

# Connect to the database
connection = pymysql.connect(**database_config)


class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """

    db = connection
Run Code Online (Sandbox Code Playgroud)

为了避免这种情况,您可以在方法中创建数据库__init__......

class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """

    def __init__(self, *args, **kwargs):
        self.db = pymysql.connect(**database_config)
Run Code Online (Sandbox Code Playgroud)

但是,这可能不高效/不实用,因为 db 对象的每个实例都会创建一个会话。

为了改进这一点,您可以使用一种方法threading.local来将连接保持在线程本地。



class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """
    _conn = threading.local()
    @property
    def db(self):
        if not hasattr(self._conn, 'db'):
            self._conn.db = pymysql.connect(**database_config)
        return self._conn.db

Run Code Online (Sandbox Code Playgroud)

请注意,假设您使用线程并发模型,线程本地解决方案就可以工作。另请注意,celery 默认情况下使用多个进程(prefork)。这可能是问题,也可能不是问题。如果这是一个问题,如果您将工作线程更改为使用 eventlet,也许可以解决它。