wis*_*ter 8 python psycopg2 multiprocessing
我试图让所有 PC 核心在填充 PostgreSQL 数据库时同时工作,我编辑了代码以产生我所得到的可重现错误
Traceback (most recent call last):
File "test2.py", line 50, in <module>
download_all_sites(sites)
File "test2.py", line 36, in download_all_sites
pool.map(download_site, sites)
File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
Run Code Online (Sandbox Code Playgroud)
导致错误的完整代码
import requests
import multiprocessing
import time
import os
import psycopg2
session = None
conn = psycopg2.connect(user="user",
password="pass123",
host="127.0.0.1",
port="5432",
database="my_db")
cursor = conn.cursor()
def set_global_session():
global session
if not session:
session = requests.Session()
def download_site(domain):
url = "http://" + domain
with session.get(url) as response:
temp = response.text.lower()
found = [i for i in keywords if i in temp]
query = """INSERT INTO test (domain, keyword) VALUES (%s, %s)"""
cursor.execute(query, (domain, found))
def download_all_sites(sites):
with multiprocessing.Pool(processes=os.cpu_count(), initializer=set_global_session) as pool:
pool.map(download_site, sites)
if __name__ == "__main__":
sites = ['google.com'] * 10
keywords = ['google', 'success']
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
conn.commit()
print(f"Finished {len(sites)} in {duration} seconds")
Run Code Online (Sandbox Code Playgroud)
为每个多进程创建一个新的 postgres 连接。Libpq连接不应该\xe2\x80\x99 与分叉进程一起使用(多处理正在执行的操作),在postgres 文档的第二个警告框中提到了这一点。
import requests\nimport multiprocessing\nimport time\nimport os\nimport psycopg2\nsession = None \n\ndef set_global_session():\n global session\n if not session:\n session = requests.Session()\n\n\ndef download_site(domain):\n url = "http://" + domain\n with session.get(url) as response:\n #temp = response.text.lower()\n #found = [i for i in keywords if i in temp]\n #query = """INSERT INTO test (domain, keyword) VALUES (%s, %s)"""\n conn = psycopg2.connect(\n "dbname=mf port=5959 host=localhost user=mf_usr"\n )\n cursor = conn.cursor()\n query = """INSERT INTO mytable (name) VALUES (%s)"""\n cursor.execute(query, (domain, ))\n conn.commit()\n conn.close()\n\n\ndef download_all_sites(sites):\n with multiprocessing.Pool(\n processes=os.cpu_count(), initializer=set_global_session\n ) as pool:\n pool.map(download_site, sites)\n\n\nif __name__ == "__main__":\n sites = [\'google.com\'] * 10\n keywords = [\'google\', \'success\']\n start_time = time.time()\n download_all_sites(sites)\n duration = time.time() - start_time\n print(f"Finished {len(sites)} in {duration} seconds")\n\n # make sure it worked!\n conn = psycopg2.connect("dbname=mf port=5959 host=localhost user=mf_usr")\n cursor = conn.cursor()\n cursor.execute(\'select count(name) from mytable\')\n print(cursor.fetchall()) # verify 10 downloads == 10 records in database\nRun Code Online (Sandbox Code Playgroud)\n出去:
\nFinished 10 in 0.9922008514404297 seconds\n[(10,)]\nRun Code Online (Sandbox Code Playgroud)\n