Sim*_*482 5 python postgresql multithreading psycopg2
我想加快我的一项任务,所以我编写了一个小程序:
import psycopg2
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
def write_sim_to_db(all_ids2):
if all_ids1[i] != all_ids2:
c.execute("""SELECT count(*) FROM similarity WHERE prod_id1 = %s AND prod_id2 = %s""", (all_ids1[i], all_ids2,))
count = c.fetchone()
if count[0] == 0:
sim_sum = random.random()
c.execute("""INSERT INTO similarity(prod_id1, prod_id2, sim_sum)
VALUES(%s, %s, %s)""", (all_ids1[i], all_ids2, sim_sum,))
conn.commit()
conn = psycopg2.connect("dbname='db' user='user' host='localhost' password='pass'")
c = conn.cursor()
all_ids1 = list(n for n in range(1000))
all_ids2_list = list(n for n in range(1000))
for i in range(len(all_ids1)):
with ThreadPoolExecutor(max_workers=5) as pool:
results = [pool.submit(write_sim_to_db, i) for i in all_ids2_list]
Run Code Online (Sandbox Code Playgroud)
有一段时间,程序运行正常。但后来我得到一个错误:
Segmentation fault (core dumped)
Run Code Online (Sandbox Code Playgroud)
或者
*** Error in `python3': double free or corruption (out): 0x00007fe574002270 ***
Aborted (core dumped)
Run Code Online (Sandbox Code Playgroud)
如果我在一个线程中运行这个程序,效果会很好。
with ThreadPoolExecutor(max_workers=1) as pool:
Run Code Online (Sandbox Code Playgroud)
Postgresql 似乎没有时间处理事务。但我不确定。日志文件中存在任何错误。
我不知道如何找到错误。帮助。
我必须使用连接池。
import psycopg2
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from psycopg2.pool import ThreadedConnectionPool
def write_sim_to_db(all_ids2):
if all_ids1[i] != all_ids2:
conn = tcp.getconn()
c = conn.cursor()
c.execute("""SELECT count(*) FROM similarity WHERE prod_id1 = %s AND prod_id2 = %s""", (all_ids1[i], all_ids2,))
count = c.fetchone()
if count[0] == 0:
sim_sum = random.random()
c.execute("""INSERT INTO similarity(prod_id1, prod_id2, sim_sum)
VALUES(%s, %s, %s)""", (all_ids1[i], all_ids2, sim_sum,))
conn.commit()
tcp.putconn(conn)
DSN = "postgresql://user:pass@localhost/db"
tcp = ThreadedConnectionPool(1, 10, DSN)
all_ids1 = list(n for n in range(1000))
all_ids2_list = list(n for n in range(1000))
for i in range(len(all_ids1)):
with ThreadPoolExecutor(max_workers=2) as pool:
results = [pool.submit(write_sim_to_db, i) for i in all_ids2_list]
Run Code Online (Sandbox Code Playgroud)
这是加速速度的明智方法。它会比您的代码更快、更简单。
tuple_list = []
for p1 in range(3):
for p2 in range(3):
if p1 == p2: continue
tuple_list.append((p1,p2,random.random()))
insert = """
insert into similarity (prod_id1, prod_id2, sim_sum)
select prod_id1, prod_id2, i.sim_sum
from
(values
{}
) i (prod_id1, prod_id2, sim_sum)
left join
similarity s using (prod_id1, prod_id2)
where s is null
""".format(',\n '.join(['%s'] * len(tuple_list)))
print cur.mogrify(insert, tuple_list)
cur.execute(insert, tuple_list)
Run Code Online (Sandbox Code Playgroud)
输出:
insert into similarity (prod_id1, prod_id2, sim_sum)
select prod_id1, prod_id2, i.sim_sum
from
(values
(0, 1, 0.7316830646236253),
(0, 2, 0.36642199082207805),
(1, 0, 0.9830936499726003),
(1, 2, 0.1401200246162232),
(2, 0, 0.9921581283868096),
(2, 1, 0.47250175432277497)
) i (prod_id1, prod_id2, sim_sum)
left join
similarity s using (prod_id1, prod_id2)
where s is null
Run Code Online (Sandbox Code Playgroud)
顺便说一句,根本不需要 Python。这一切都可以通过简单的 SQL 查询来完成。
| 归档时间: |
|
| 查看次数: |
9707 次 |
| 最近记录: |