python multiprocessing + peewee + postgresql 因 SSL 错误而失败

Eel*_*iet 6 python postgresql multiprocessing python-3.x peewee

我正在尝试编写一个 Python 模型,该模型能够使用多线程模块和 peewee 在 PostgreSQL 数据库中进行一些处理。

但是,在单核模式下,代码可以工作,但是,当我尝试使用多核运行代码时,我遇到了 SSL 错误。

我想发布我的模型的结构,希望有人可以建议如何以适当的方式设置我的模型。目前,我选择使用面向对象的方法,在该方法中我建立一个在池中共享的连接。为了澄清我所做的,我现在将展示我目前拥有的源代码

我有三个文件:main.py、models.py 和 parser.py。内容如下

models.py 定义了 peewee postgresql 表并连接到 postgres 服务器

import peewee as pw
from playhouse.pool import PooledPostgresqlExtDatabase

KVK_KEY = "id_number"
NAME_KEY = "name"
N_VOWELS_KEY = "n_vowels"

# initialise the data base
database = PooledPostgresqlExtDatabase(
    "testdb", user="postgres", host="localhost", port=5432, password="xxxx",
    max_connections=8, stale_timeout=300 )


class BaseModel(pw.Model):
    class Meta:
        database = database
        only_save_dirty = True


# this class describes the format of the sql data base
class Company(BaseModel):
    id_number = pw.IntegerField(primary_key=True)
    name = pw.CharField(null=True)
    n_vowels = pw.IntegerField(default=-1)
    processor = pw.IntegerField(default=-1)


def connect_database(database_name, reset_database=False):
    """ connect the database """
    database.connect()
    if reset_database:
        database.drop_tables([Company])
    database.create_tables([Company])
Run Code Online (Sandbox Code Playgroud)

parser.py 包含 CompanyParser 类,该类用作代码的引擎来完成所有处理。它生成一些人工数据存储到 postgresql 数据库中,然后使用run方法对已经存储在数据库中的数据进行一些处理

import pandas as pd
import numpy as np
import random
import string
import peewee as pw
from models import (Company, database, KVK_KEY, NAME_KEY)
import multiprocessing as mp

MAX_SQL_CHUNK = 1000

np.random.seed(0)


def random_name(size=8, chars=string.ascii_lowercase):
    """ Create a random character string of 'size' characters """
    return "".join(random.choice(chars) for _ in range(size))


def vowel_count(characters):
    """
    Count the number of vowels in the string 'characters' and return as an integer
    """
    count = 0
    for char in characters:
        if char in list("aeiou"):
            count += 1
    return count


class CompanyParser(mp.Process):
    def __init__(self, number_of_companies=100, i_proc=None,
                 number_of_procs=1,
                 first_id=None, last_id=None):
        if i_proc is not None and number_of_procs > 1:
            mp.Process.__init__(self)

        self.i_proc = i_proc
        self.number_of_procs = number_of_procs
        self.n_companies = number_of_companies
        self.data_df: pd.DataFrame = None

        self.first_id = first_id
        self.last_id = last_id

    def generate_data(self):
        """ Create a dataframe with fake company data and id's """
        id_list = np.random.randint(1000000, 9999999, self.n_companies)
        company_list = np.array([random_name() for _ in range(self.n_companies)])
        self.data_df = pd.DataFrame(data=np.vstack([id_list, company_list]).T,
                                    columns=[KVK_KEY, NAME_KEY])
        self.data_df.sort_values([KVK_KEY], inplace=True)

    def store_to_database(self):
        """
        Store the company data to a sql database
        """
        record_list = list(self.data_df.to_dict(orient="index").values())

        n_batch = int(len(record_list) / MAX_SQL_CHUNK) + 1

        with database.atomic():
            for cnt, batch in enumerate(pw.chunked(record_list, MAX_SQL_CHUNK)):
                print(f"writing {cnt}/{n_batch}")
                Company.insert_many(batch).execute()

    def run(self):
        print("Making query at {}".format(self.i_proc))
        query = (Company.
                 select().
                 where(Company.id_number.between(self.first_id, self.last_id)))
        print("Found {} companies".format(query.count()))

        for cnt, company in enumerate(query):
            print("Processing @ {} - {}:  company {}/{}".format(self.i_proc, cnt,
                                                                company.id_number,
                                                                company.name))
            number_of_vowels = vowel_count(company.name)
            company.n_vowels = number_of_vowels
            company.processor = self.i_proc
            print(f"storing number of vowels: {number_of_vowels}")
            company.save()
Run Code Online (Sandbox Code Playgroud)

最后,我的主脚本加载存储在 models.py 和 parser.py 中的类并启动代码。

from models import (Company, connect_database)
from parser import CompanyParser

number_of_processors = 2
connect_database(None, reset_database=True)

# init an object of the CompanyParser and use the create database 
parser = CompanyParser()

company_ids = Company.select(Company.id_number)
parser.generate_data()
parser.store_to_database()

n_companies = company_ids.count()
n_comp_per_proc = int(n_companies / number_of_processors)
print("Found {} companies: {} per proc".format(n_companies, n_comp_per_proc))

for i_proc in range(number_of_processors):
    i_start = i_proc * n_comp_per_proc
    first_id = company_ids[i_start]
    last_id = company_ids[i_start + n_comp_per_proc - 1]

    print(f"Running proc {i_proc} for id {first_id} until id {last_id}")
    sub_parser = CompanyParser(first_id=first_id, last_id=last_id,
                               i_proc=i_proc,
                               number_of_procs=number_of_processors)

    if number_of_processors > 1:
        sub_parser.start()
    else:
        sub_parser.run()
Run Code Online (Sandbox Code Playgroud)

如果number_of_processors = 1,此脚本可以正常工作。它生成人工数据,将其存储到 PostgreSQL 数据库并对数据进行一些处理(它计算名称中的元音数量并将其存储到 n_vowels 列中)

但是,如果我尝试使用number_of_processors = 2 的2 个内核运行此程序,则会遇到以下错误

/opt/miniconda3/bin/python /home/eelco/PycharmProjects/multiproc_peewee/main.py
writing 0/1
Found 100 companies: 50 per proc
Running proc 0 for id 1020737 until id 5295565
Running proc 1 for id 5302405 until id 9891087
Making query at 0
Found 50 companies
Processing @ 0 - 0:  company 1020737/wqrbgxiu
storing number of vowels: 2
Making query at 1
Process CompanyParser-1:
Processing @ 0 - 1:  company 1086107/lkbagrbc
storing number of vowels: 1
Processing @ 0 - 2:  company 1298367/nsdjsqio
storing number of vowels: 2
Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
psycopg2.OperationalError: SSL error: sslv3 alert bad record mac


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/eelco/PycharmProjects/multiproc_peewee/parser.py", line 82, in run
    company.save()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 5748, in save
    rows = self.update(**field_dict).where(self._pk_expr()).execute()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1696, in execute
    return self._execute(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2121, in _execute
    cursor = database.execute(self)
  File "/opt/miniconda3/lib/python3.7/site-packages/playhouse/postgres_ext.py", line 468, in execute
    cursor = self.execute_sql(sql, params, commit=commit)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2721, in execute_sql
    self.commit()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2512, in __exit__
    reraise(new_type, new_type(*exc_args), traceback)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
peewee.OperationalError: SSL error: sslv3 alert bad record mac

Process CompanyParser-2:
Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
psycopg2.OperationalError: SSL error: decryption failed or bad record mac


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/eelco/PycharmProjects/multiproc_peewee/parser.py", line 72, in run
    print("Found {} companies".format(query.count()))
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1881, in count
    return Select([clone], [fn.COUNT(SQL('1'))]).scalar(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1866, in scalar
    row = self.tuples().peek(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1853, in peek
    rows = self.execute(database)[:n]
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1696, in execute
    return self._execute(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1847, in _execute
    cursor = database.execute(self)
  File "/opt/miniconda3/lib/python3.7/site-packages/playhouse/postgres_ext.py", line 468, in execute
    cursor = self.execute_sql(sql, params, commit=commit)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2721, in execute_sql
    self.commit()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2512, in __exit__
    reraise(new_type, new_type(*exc_args), traceback)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
peewee.OperationalError: SSL error: decryption failed or bad record mac


Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)

不知何故,一旦第二个线程开始对数据库执行某些操作,就会出现问题。有人建议让这段代码正常工作吗?我已经尝试过以下

  • 尝试使用 PooledPostgresDatabase 和普通 PostgresqlDatabase 连接到数据库。这会导致同样的错误
  • 尝试使用 sqlite 代替 postgres。这适用于 2 个内核,但前提是两个进程不会干扰太多;否则我会遇到一些锁定问题。我的印象是 postgres 比 sqlite 更适合进行多处理(这是真的吗?)
  • 在启动第一个进程后暂停时(仅使用一个核心如此有效),代码有效,表明正确调用了start方法。

希望有人能指教。

问候 Eelco

Eel*_*iet 5

今天在互联网上进行一番搜索后,我在这里找到了问题的解决方案:github.com/coleifer。正如 coleifer 提到的:显然,在开始连接到数据库之前,您首先必须设置所有分叉。基于这个想法,我修改了我的代码,现在它可以工作了。

对于那些感兴趣的人,我将再次发布我的 python 脚本,以便您可以看到我是如何做到的。这是因为我没有那么多明确的例子,所以也许它可能对其他人有帮助。

首先,所有数据库和 peewee 模块现在都移至初始化函数中,这些函数仅在 CompanyParser 类的构造函数内调用。所以 models.py 看起来像

import peewee as pw
from playhouse.pool import PooledPostgresqlExtDatabase, PostgresqlDatabase, PooledPostgresqlDatabase

KVK_KEY = "id_number"
NAME_KEY = "name"
N_VOWELS_KEY = "n_vowels"


def init_database():
    db = PooledPostgresqlDatabase(
        "testdb", user="postgres", host="localhost", port=5432, password="xxxxx",
        max_connections=8, stale_timeout=300)
    return db


def init_models(db, reset_tables=False):

    class BaseModel(pw.Model):
        class Meta:
            database = db

    # this class describes the format of the sql data base
    class Company(BaseModel):
        id_number = pw.IntegerField(primary_key=True)
        name = pw.CharField(null=True)
        n_vowels = pw.IntegerField(default=-1)
        processor = pw.IntegerField(default=-1)

    if db.is_closed():
        db.connect()
    if reset_tables and Company.table_exists():
        db.drop_tables([Company])
    db.create_tables([Company])

    return Company
Run Code Online (Sandbox Code Playgroud)

然后,在 parser.py 脚本中定义工作类 CompanyParser ,如下所示

import multiprocessing as mp
import random
import string

import numpy as np
import pandas as pd
import peewee as pw

from models import (KVK_KEY, NAME_KEY, init_database, init_models)

MAX_SQL_CHUNK = 1000

np.random.seed(0)


def random_name(size=32, chars=string.ascii_lowercase):
    """ Create a random character string of 'size' characters """
    return "".join(random.choice(chars) for _ in range(size))


def vowel_count(characters):
    """
    Count the number of vowels in the string 'characters' and return as an integer
    """
    count = 0
    for char in characters:
        if char in list("aeiou"):
            count += 1
    return count


class CompanyParser(mp.Process):
    def __init__(self, reset_tables=False,
                 number_of_companies=100, i_proc=None,
                 number_of_procs=1, first_id=None, last_id=None):
        if i_proc is not None and number_of_procs > 1:
            mp.Process.__init__(self)

        self.i_proc = i_proc
        self.reset_tables = reset_tables

        self.number_of_procs = number_of_procs
        self.n_companies = number_of_companies
        self.data_df: pd.DataFrame = None

        self.first_id = first_id
        self.last_id = last_id

        # initialise the database and models
        self.database = init_database()
        self.Company = init_models(self.database, reset_tables=self.reset_tables)

    def generate_data(self):
        """ Create a dataframe with fake company data and id's and return the array of id's"""
        id_list = np.random.randint(1000000, 9999999, self.n_companies)
        company_list = np.array([random_name() for _ in range(self.n_companies)])
        self.data_df = pd.DataFrame(data=np.vstack([id_list, company_list]).T,
                                    columns=[KVK_KEY, NAME_KEY])
        self.data_df.drop_duplicates([KVK_KEY], inplace=True)
        self.data_df.sort_values([KVK_KEY], inplace=True)
        return self.data_df[KVK_KEY].values

    def store_to_database(self):
        """
        Store the company data to a sql database
        """
        record_list = list(self.data_df.to_dict(orient="index").values())

        n_batch = int(len(record_list) / MAX_SQL_CHUNK) + 1

        with self.database.atomic():
            for cnt, batch in enumerate(pw.chunked(record_list, MAX_SQL_CHUNK)):
                print(f"writing {cnt}/{n_batch}")
                self.Company.insert_many(batch).execute()

    def run(self):
        query = (self.Company.
                 select().
                 where(self.Company.id_number.between(self.first_id, self.last_id)))

        for cnt, company in enumerate(query):
            print("Processing @ {} - {}:  company {}/{}".format(self.i_proc, cnt, company.id_number,
                                                                company.name))
            number_of_vowels = vowel_count(company.name)
            company.n_vowels = number_of_vowels
            company.processor = self.i_proc
            try:
                company.save()
            except (pw.OperationalError, pw.InterfaceError) as err:
                print("failed save for {} {}: {}".format(self.i_proc, cnt, err))
            else:
                pass
Run Code Online (Sandbox Code Playgroud)

最后,启动进程的 main.py 脚本:

from parser import CompanyParser
import time


def main():
    number_of_processors = 2
    number_of_companies = 10000

    parser = CompanyParser(number_of_companies=number_of_companies, reset_tables=True)
    company_ids = parser.generate_data()
    parser.store_to_database()

    n_companies = company_ids.size
    n_comp_per_proc = int(n_companies / number_of_processors)
    print("Found {} companies: {} per proc".format(n_companies, n_comp_per_proc))
    if not parser.database.is_closed():
        parser.database.close()

    processes = list()
    for i_proc in range(number_of_processors):
        i_start = i_proc * n_comp_per_proc
        first_id = company_ids[i_start]
        last_id = company_ids[i_start + n_comp_per_proc - 1]

        print(f"Running proc {i_proc} for id {first_id} until id {last_id}")

        sub_parser = CompanyParser(first_id=first_id, last_id=last_id, i_proc=i_proc,
                                   number_of_procs=number_of_processors)

        if number_of_processors > 1:
            sub_parser.start()
        else:
            sub_parser.run()

        processes.append(sub_parser)

    # this blocks the script until all processes are done
    for job in processes:
        job.join()

    # make sure all the connections are closed
    for i_proc in range(number_of_processors):
        db = processes[i_proc].database
        if not db.is_closed():
            db.close()
    print("Goodbye!")


if __name__ == "__main__":

    start = time.time()
    main()
    duration = time.time() - start
    print(f"Done in {duration} s")
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,数据库连接是在类内的每个进程中完成的。这个示例有效,是多处理 + peewee 和 PostgreSQL 的完整示例。希望这可以帮助其他人。如果您有任何改进意见或建议,请告诉我。