multiprocessing/psycopg2 TypeError:无法pickle _thread.RLock对象

Jus*_*ser 9 python postgresql sqlalchemy psycopg2 python-multiprocessing

我按照下面的代码在postgres数据库上实现并行选择查询:

https://tech.geoblink.com/2017/07/06/parallelizing-queries-in-postgresql-with-python/

我的基本问题是我有大约6k个需要执行的查询,我正在尝试优化这些选择查询的执行.最初它是一个where id in (...)包含所有6k谓词ID 的单个查询,但是我在运行它的机器上使用> 4GB的RAM遇到了查询问题,因此我决定将其拆分为6k个别查询,当同步保持时稳定的内存使用.然而,运行时间需要更长的时间,这对我的用例来说不是一个问题.即便如此,我也在尽量减少时间.

这就是我的代码:

class PostgresConnector(object):
    def __init__(self, db_url):
        self.db_url = db_url
        self.engine = self.init_connection()
        self.pool = self.init_pool()

    def init_pool(self):
        CPUS = multiprocessing.cpu_count()
        return multiprocessing.Pool(CPUS)

    def init_connection(self):
        LOGGER.info('Creating Postgres engine')
        return create_engine(self.db_url)

    def run_parallel_queries(self, queries):
        results = []
        try:
            for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
                results.append(i)
        except Exception as exception:
            LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
            raise
        finally:
            self.pool.close()
            self.pool.join()

        LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))

        return list(chain.from_iterable(results))

    def execute_parallel_query(self, query):
        con = psycopg2.connect(self.db_url)
        cur = con.cursor()
        cur.execute(query)
        records = cur.fetchall()
        con.close()

        return list(records)
Run Code Online (Sandbox Code Playgroud)

但是无论何时运行,我都会收到以下错误:

TypeError: can't pickle _thread.RLock objects
Run Code Online (Sandbox Code Playgroud)

我已经阅读了许多关于多处理和可拾取对象的使用的类似问题,但我不知道我的生活中弄清楚我做错了什么.

该池通常是每个进程一个(我认为这是最佳实践),但是每个连接器类的实例共享,以便它不会为每次使用parallel_query方法创建池.

对类似问题的最佳答案:

从Python多处理访问MySQL连接池

显示与我自己几乎相同的实现,除了使用MySql而不是Postgres.

难道我做错了什么?

谢谢!

编辑:

我找到了这个答案:

Python Postgres psycopg2 ThreadedConnectionPool用尽了

这是非常详细的,看起来好像我误解multiprocessing.Pool了连接池这样的东西ThreadedConnectionPool给了我.但是在第一个链接中它没有提到需要任何连接池等.这个解决方案似乎很好,但似乎有很多代码我认为是一个相当简单的问题?

编辑2:

所以上面的链接解决了另一个问题,我可能会遇到这种问题所以我很高兴我发现了,但它并没有解决最初的问题,即无法使用imap_unordered到酸洗错误.很沮丧.

最后,我认为值得注意的是,它在Heroku中运行,在工作器dyno上运行,使用Redis rq进行调度,后台任务等以及Postgres的托管实例作为数据库.

Lie*_*yan 11

简单地说,postgres 连接和 sqlalchemy 连接池是线程安全的,但它们不是 fork 安全的。

如果你想使用多处理,你应该在fork之后在每个子进程中初始化引擎。

如果要共享引擎,则应改用多线程。

请参阅psycopg2 文档中的线程和进程安全

分叉进程不应使用 libpq 连接,因此在使用诸如 multiprocessing 之类的模块或诸如 FastCGI 之类的分叉 Web 部署方法时,请确保在分叉之后创建连接。

如果您使用 multiprocessing.Pool,则有一个关键字参数初始值设定项,可用于在每个子进程上运行一次代码。尝试这个:

class PostgresConnector(object):
    def __init__(self, db_url):
        self.db_url = db_url
        self.pool = self.init_pool()

    def init_pool(self):
        CPUS = multiprocessing.cpu_count()
        return multiprocessing.Pool(CPUS, initializer=self.init_connection(self.db_url))

    @classmethod
    def init_connection(cls, db_url):
        def _init_connection():
            LOGGER.info('Creating Postgres engine')
            cls.engine = create_engine(db_url)
        return _init_connection

    def run_parallel_queries(self, queries):
        results = []
        try:
            for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
                results.append(i)
        except Exception as exception:
            LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
            raise
        finally:
            pass
            #self.pool.close()
            #self.pool.join()

        LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))

        return list(chain.from_iterable(results))

    def execute_parallel_query(self, query):
        with self.engine.connect() as conn:
            with conn.begin():
                result = conn.execute(query)
                return result.fetchall()

    def __getstate__(self):
        # this is a hack, if you want to remove this method, you should
        # remove self.pool and just pass pool explicitly
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict
Run Code Online (Sandbox Code Playgroud)

现在,解决 XY 问题。

最初它是一个单一的查询,其中的 where id in (...) 包含所有 6k 谓词 ID,但我遇到了查询问题,在它运行的机器上使用了 > 4GB 的 RAM,因此我决定将其拆分为6k 个单独的查询,同步保持稳定的内存使用。

您可能想要做的是以下选项之一:

  1. 编写一个生成所有 6000 个 ID 的子查询,并在原始批量查询中使用该子查询。
  2. 如上所述,但将子查询写为CTE
  3. 如果您的 ID 列表来自外部来源(即不是来自数据库),那么您可以创建一个包含 6000 个 ID 的临时表,然后针对临时表运行原始批量查询

但是,如果你坚持通过 python 运行 6000 个 ID,那么最快的查询很可能既不是一次性完成所有 6000 个 ID(这会耗尽内存),也不是运行 6000 个单独的查询。相反,您可能想要尝试对查询进行分块。例如,一次发送 500 个 ID。您将不得不尝试使用块大小来确定一次可以发送的最大 ID 数,同时仍然在您的内存预算范围内。