多重处理中解密失败或坏记录 mac

wis*_*ter 8 python psycopg2 multiprocessing

我试图让所有 PC 核心在填充 PostgreSQL 数据库时同时工作,我编辑了代码以产生我所得到的可重现错误

Traceback (most recent call last):
  File "test2.py", line 50, in <module>
    download_all_sites(sites)
  File "test2.py", line 36, in download_all_sites
    pool.map(download_site, sites)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
Run Code Online (Sandbox Code Playgroud)

导致错误的完整代码

import requests
import multiprocessing
import time
import os
import psycopg2
session = None
conn = psycopg2.connect(user="user",
                        password="pass123",
                        host="127.0.0.1",
                        port="5432",
                        database="my_db")
cursor = conn.cursor()


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(domain):
    url = "http://" + domain
    with session.get(url) as response:
        temp = response.text.lower()
        found = [i for i in keywords if i in temp]
        query = """INSERT INTO test (domain, keyword) VALUES (%s, %s)"""
        cursor.execute(query, (domain, found))


def download_all_sites(sites):
    with multiprocessing.Pool(processes=os.cpu_count(), initializer=set_global_session) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = ['google.com'] * 10
    keywords = ['google', 'success']
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    conn.commit()
    print(f"Finished {len(sites)} in {duration} seconds")
Run Code Online (Sandbox Code Playgroud)

Mau*_*yer 8

为每个多进程创建一个新的 postgres 连接。Libpq连接不应该\xe2\x80\x99 与分叉进程一起使用(多处理正在执行的操作),在postgres 文档的第二个警告框中提到了这一点。

\n
import requests\nimport multiprocessing\nimport time\nimport os\nimport psycopg2\nsession = None    \n\ndef set_global_session():\n    global session\n    if not session:\n        session = requests.Session()\n\n\ndef download_site(domain):\n    url = "http://" + domain\n    with session.get(url) as response:\n        #temp = response.text.lower()\n        #found = [i for i in keywords if i in temp]\n        #query = """INSERT INTO test (domain, keyword) VALUES (%s, %s)"""\n        conn = psycopg2.connect(\n            "dbname=mf port=5959 host=localhost user=mf_usr"\n        )\n        cursor = conn.cursor()\n        query = """INSERT INTO mytable (name) VALUES (%s)"""\n        cursor.execute(query, (domain, ))\n        conn.commit()\n        conn.close()\n\n\ndef download_all_sites(sites):\n    with multiprocessing.Pool(\n        processes=os.cpu_count(), initializer=set_global_session\n    ) as pool:\n        pool.map(download_site, sites)\n\n\nif __name__ == "__main__":\n    sites = [\'google.com\'] * 10\n    keywords = [\'google\', \'success\']\n    start_time = time.time()\n    download_all_sites(sites)\n    duration = time.time() - start_time\n    print(f"Finished {len(sites)} in {duration} seconds")\n\n    # make sure it worked!\n    conn = psycopg2.connect("dbname=mf port=5959 host=localhost user=mf_usr")\n    cursor = conn.cursor()\n    cursor.execute(\'select count(name) from mytable\')\n    print(cursor.fetchall())  # verify 10 downloads == 10 records in database\n
Run Code Online (Sandbox Code Playgroud)\n

出去:

\n
Finished 10 in 0.9922008514404297 seconds\n[(10,)]\n
Run Code Online (Sandbox Code Playgroud)\n