Eel*_*iet 6 python postgresql multiprocessing python-3.x peewee
我正在尝试编写一个 Python 模型,该模型能够使用多线程模块和 peewee 在 PostgreSQL 数据库中进行一些处理。
但是,在单核模式下,代码可以工作,但是,当我尝试使用多核运行代码时,我遇到了 SSL 错误。
我想发布我的模型的结构,希望有人可以建议如何以适当的方式设置我的模型。目前,我选择使用面向对象的方法,在该方法中我建立一个在池中共享的连接。为了澄清我所做的,我现在将展示我目前拥有的源代码
我有三个文件:main.py、models.py 和 parser.py。内容如下
models.py 定义了 peewee postgresql 表并连接到 postgres 服务器
import peewee as pw
from playhouse.pool import PooledPostgresqlExtDatabase
KVK_KEY = "id_number"
NAME_KEY = "name"
N_VOWELS_KEY = "n_vowels"
# initialise the data base
database = PooledPostgresqlExtDatabase(
"testdb", user="postgres", host="localhost", port=5432, password="xxxx",
max_connections=8, stale_timeout=300 )
class BaseModel(pw.Model):
class Meta:
database = database
only_save_dirty = True
# this class describes the format of the sql data base
class Company(BaseModel):
id_number = pw.IntegerField(primary_key=True)
name = pw.CharField(null=True)
n_vowels = pw.IntegerField(default=-1)
processor = pw.IntegerField(default=-1)
def connect_database(database_name, reset_database=False):
""" connect the database """
database.connect()
if reset_database:
database.drop_tables([Company])
database.create_tables([Company])
Run Code Online (Sandbox Code Playgroud)
parser.py 包含 CompanyParser 类,该类用作代码的引擎来完成所有处理。它生成一些人工数据存储到 postgresql 数据库中,然后使用run方法对已经存储在数据库中的数据进行一些处理
import pandas as pd
import numpy as np
import random
import string
import peewee as pw
from models import (Company, database, KVK_KEY, NAME_KEY)
import multiprocessing as mp
MAX_SQL_CHUNK = 1000
np.random.seed(0)
def random_name(size=8, chars=string.ascii_lowercase):
""" Create a random character string of 'size' characters """
return "".join(random.choice(chars) for _ in range(size))
def vowel_count(characters):
"""
Count the number of vowels in the string 'characters' and return as an integer
"""
count = 0
for char in characters:
if char in list("aeiou"):
count += 1
return count
class CompanyParser(mp.Process):
def __init__(self, number_of_companies=100, i_proc=None,
number_of_procs=1,
first_id=None, last_id=None):
if i_proc is not None and number_of_procs > 1:
mp.Process.__init__(self)
self.i_proc = i_proc
self.number_of_procs = number_of_procs
self.n_companies = number_of_companies
self.data_df: pd.DataFrame = None
self.first_id = first_id
self.last_id = last_id
def generate_data(self):
""" Create a dataframe with fake company data and id's """
id_list = np.random.randint(1000000, 9999999, self.n_companies)
company_list = np.array([random_name() for _ in range(self.n_companies)])
self.data_df = pd.DataFrame(data=np.vstack([id_list, company_list]).T,
columns=[KVK_KEY, NAME_KEY])
self.data_df.sort_values([KVK_KEY], inplace=True)
def store_to_database(self):
"""
Store the company data to a sql database
"""
record_list = list(self.data_df.to_dict(orient="index").values())
n_batch = int(len(record_list) / MAX_SQL_CHUNK) + 1
with database.atomic():
for cnt, batch in enumerate(pw.chunked(record_list, MAX_SQL_CHUNK)):
print(f"writing {cnt}/{n_batch}")
Company.insert_many(batch).execute()
def run(self):
print("Making query at {}".format(self.i_proc))
query = (Company.
select().
where(Company.id_number.between(self.first_id, self.last_id)))
print("Found {} companies".format(query.count()))
for cnt, company in enumerate(query):
print("Processing @ {} - {}: company {}/{}".format(self.i_proc, cnt,
company.id_number,
company.name))
number_of_vowels = vowel_count(company.name)
company.n_vowels = number_of_vowels
company.processor = self.i_proc
print(f"storing number of vowels: {number_of_vowels}")
company.save()
Run Code Online (Sandbox Code Playgroud)
最后,我的主脚本加载存储在 models.py 和 parser.py 中的类并启动代码。
from models import (Company, connect_database)
from parser import CompanyParser
number_of_processors = 2
connect_database(None, reset_database=True)
# init an object of the CompanyParser and use the create database
parser = CompanyParser()
company_ids = Company.select(Company.id_number)
parser.generate_data()
parser.store_to_database()
n_companies = company_ids.count()
n_comp_per_proc = int(n_companies / number_of_processors)
print("Found {} companies: {} per proc".format(n_companies, n_comp_per_proc))
for i_proc in range(number_of_processors):
i_start = i_proc * n_comp_per_proc
first_id = company_ids[i_start]
last_id = company_ids[i_start + n_comp_per_proc - 1]
print(f"Running proc {i_proc} for id {first_id} until id {last_id}")
sub_parser = CompanyParser(first_id=first_id, last_id=last_id,
i_proc=i_proc,
number_of_procs=number_of_processors)
if number_of_processors > 1:
sub_parser.start()
else:
sub_parser.run()
Run Code Online (Sandbox Code Playgroud)
如果number_of_processors = 1,此脚本可以正常工作。它生成人工数据,将其存储到 PostgreSQL 数据库并对数据进行一些处理(它计算名称中的元音数量并将其存储到 n_vowels 列中)
但是,如果我尝试使用number_of_processors = 2 的2 个内核运行此程序,则会遇到以下错误
/opt/miniconda3/bin/python /home/eelco/PycharmProjects/multiproc_peewee/main.py
writing 0/1
Found 100 companies: 50 per proc
Running proc 0 for id 1020737 until id 5295565
Running proc 1 for id 5302405 until id 9891087
Making query at 0
Found 50 companies
Processing @ 0 - 0: company 1020737/wqrbgxiu
storing number of vowels: 2
Making query at 1
Process CompanyParser-1:
Processing @ 0 - 1: company 1086107/lkbagrbc
storing number of vowels: 1
Processing @ 0 - 2: company 1298367/nsdjsqio
storing number of vowels: 2
Traceback (most recent call last):
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
cursor.execute(sql, params or ())
psycopg2.OperationalError: SSL error: sslv3 alert bad record mac
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/home/eelco/PycharmProjects/multiproc_peewee/parser.py", line 82, in run
company.save()
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 5748, in save
rows = self.update(**field_dict).where(self._pk_expr()).execute()
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
return method(self, database, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1696, in execute
return self._execute(database)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2121, in _execute
cursor = database.execute(self)
File "/opt/miniconda3/lib/python3.7/site-packages/playhouse/postgres_ext.py", line 468, in execute
cursor = self.execute_sql(sql, params, commit=commit)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2721, in execute_sql
self.commit()
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2512, in __exit__
reraise(new_type, new_type(*exc_args), traceback)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 186, in reraise
raise value.with_traceback(tb)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
cursor.execute(sql, params or ())
peewee.OperationalError: SSL error: sslv3 alert bad record mac
Process CompanyParser-2:
Traceback (most recent call last):
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
cursor.execute(sql, params or ())
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/home/eelco/PycharmProjects/multiproc_peewee/parser.py", line 72, in run
print("Found {} companies".format(query.count()))
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
return method(self, database, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1881, in count
return Select([clone], [fn.COUNT(SQL('1'))]).scalar(database)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
return method(self, database, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1866, in scalar
row = self.tuples().peek(database)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
return method(self, database, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1853, in peek
rows = self.execute(database)[:n]
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
return method(self, database, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1696, in execute
return self._execute(database)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1847, in _execute
cursor = database.execute(self)
File "/opt/miniconda3/lib/python3.7/site-packages/playhouse/postgres_ext.py", line 468, in execute
cursor = self.execute_sql(sql, params, commit=commit)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2721, in execute_sql
self.commit()
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2512, in __exit__
reraise(new_type, new_type(*exc_args), traceback)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 186, in reraise
raise value.with_traceback(tb)
File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
cursor.execute(sql, params or ())
peewee.OperationalError: SSL error: decryption failed or bad record mac
Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)
不知何故,一旦第二个线程开始对数据库执行某些操作,就会出现问题。有人建议让这段代码正常工作吗?我已经尝试过以下
希望有人能指教。
问候 Eelco
今天在互联网上进行一番搜索后,我在这里找到了问题的解决方案:github.com/coleifer。正如 coleifer 提到的:显然,在开始连接到数据库之前,您首先必须设置所有分叉。基于这个想法,我修改了我的代码,现在它可以工作了。
对于那些感兴趣的人,我将再次发布我的 python 脚本,以便您可以看到我是如何做到的。这是因为我没有那么多明确的例子,所以也许它可能对其他人有帮助。
首先,所有数据库和 peewee 模块现在都移至初始化函数中,这些函数仅在 CompanyParser 类的构造函数内调用。所以 models.py 看起来像
import peewee as pw
from playhouse.pool import PooledPostgresqlExtDatabase, PostgresqlDatabase, PooledPostgresqlDatabase
KVK_KEY = "id_number"
NAME_KEY = "name"
N_VOWELS_KEY = "n_vowels"
def init_database():
db = PooledPostgresqlDatabase(
"testdb", user="postgres", host="localhost", port=5432, password="xxxxx",
max_connections=8, stale_timeout=300)
return db
def init_models(db, reset_tables=False):
class BaseModel(pw.Model):
class Meta:
database = db
# this class describes the format of the sql data base
class Company(BaseModel):
id_number = pw.IntegerField(primary_key=True)
name = pw.CharField(null=True)
n_vowels = pw.IntegerField(default=-1)
processor = pw.IntegerField(default=-1)
if db.is_closed():
db.connect()
if reset_tables and Company.table_exists():
db.drop_tables([Company])
db.create_tables([Company])
return Company
Run Code Online (Sandbox Code Playgroud)
然后,在 parser.py 脚本中定义工作类 CompanyParser ,如下所示
import multiprocessing as mp
import random
import string
import numpy as np
import pandas as pd
import peewee as pw
from models import (KVK_KEY, NAME_KEY, init_database, init_models)
MAX_SQL_CHUNK = 1000
np.random.seed(0)
def random_name(size=32, chars=string.ascii_lowercase):
""" Create a random character string of 'size' characters """
return "".join(random.choice(chars) for _ in range(size))
def vowel_count(characters):
"""
Count the number of vowels in the string 'characters' and return as an integer
"""
count = 0
for char in characters:
if char in list("aeiou"):
count += 1
return count
class CompanyParser(mp.Process):
def __init__(self, reset_tables=False,
number_of_companies=100, i_proc=None,
number_of_procs=1, first_id=None, last_id=None):
if i_proc is not None and number_of_procs > 1:
mp.Process.__init__(self)
self.i_proc = i_proc
self.reset_tables = reset_tables
self.number_of_procs = number_of_procs
self.n_companies = number_of_companies
self.data_df: pd.DataFrame = None
self.first_id = first_id
self.last_id = last_id
# initialise the database and models
self.database = init_database()
self.Company = init_models(self.database, reset_tables=self.reset_tables)
def generate_data(self):
""" Create a dataframe with fake company data and id's and return the array of id's"""
id_list = np.random.randint(1000000, 9999999, self.n_companies)
company_list = np.array([random_name() for _ in range(self.n_companies)])
self.data_df = pd.DataFrame(data=np.vstack([id_list, company_list]).T,
columns=[KVK_KEY, NAME_KEY])
self.data_df.drop_duplicates([KVK_KEY], inplace=True)
self.data_df.sort_values([KVK_KEY], inplace=True)
return self.data_df[KVK_KEY].values
def store_to_database(self):
"""
Store the company data to a sql database
"""
record_list = list(self.data_df.to_dict(orient="index").values())
n_batch = int(len(record_list) / MAX_SQL_CHUNK) + 1
with self.database.atomic():
for cnt, batch in enumerate(pw.chunked(record_list, MAX_SQL_CHUNK)):
print(f"writing {cnt}/{n_batch}")
self.Company.insert_many(batch).execute()
def run(self):
query = (self.Company.
select().
where(self.Company.id_number.between(self.first_id, self.last_id)))
for cnt, company in enumerate(query):
print("Processing @ {} - {}: company {}/{}".format(self.i_proc, cnt, company.id_number,
company.name))
number_of_vowels = vowel_count(company.name)
company.n_vowels = number_of_vowels
company.processor = self.i_proc
try:
company.save()
except (pw.OperationalError, pw.InterfaceError) as err:
print("failed save for {} {}: {}".format(self.i_proc, cnt, err))
else:
pass
Run Code Online (Sandbox Code Playgroud)
最后,启动进程的 main.py 脚本:
from parser import CompanyParser
import time
def main():
number_of_processors = 2
number_of_companies = 10000
parser = CompanyParser(number_of_companies=number_of_companies, reset_tables=True)
company_ids = parser.generate_data()
parser.store_to_database()
n_companies = company_ids.size
n_comp_per_proc = int(n_companies / number_of_processors)
print("Found {} companies: {} per proc".format(n_companies, n_comp_per_proc))
if not parser.database.is_closed():
parser.database.close()
processes = list()
for i_proc in range(number_of_processors):
i_start = i_proc * n_comp_per_proc
first_id = company_ids[i_start]
last_id = company_ids[i_start + n_comp_per_proc - 1]
print(f"Running proc {i_proc} for id {first_id} until id {last_id}")
sub_parser = CompanyParser(first_id=first_id, last_id=last_id, i_proc=i_proc,
number_of_procs=number_of_processors)
if number_of_processors > 1:
sub_parser.start()
else:
sub_parser.run()
processes.append(sub_parser)
# this blocks the script until all processes are done
for job in processes:
job.join()
# make sure all the connections are closed
for i_proc in range(number_of_processors):
db = processes[i_proc].database
if not db.is_closed():
db.close()
print("Goodbye!")
if __name__ == "__main__":
start = time.time()
main()
duration = time.time() - start
print(f"Done in {duration} s")
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,数据库连接是在类内的每个进程中完成的。这个示例有效,是多处理 + peewee 和 PostgreSQL 的完整示例。希望这可以帮助其他人。如果您有任何改进意见或建议,请告诉我。
| 归档时间: |
|
| 查看次数: |
1700 次 |
| 最近记录: |