Phy*_*win 10 python concurrency multithreading multiprocessing
我在用 multiprocessing.Pool()
这是我想要的池:
def insert_and_process(file_to_process,db):
db = DAL("path_to_mysql" + db)
#Table Definations
db.table.insert(**parse_file(file_to_process))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
P.map(insert_and_process,file_list,db) # here having problem.
Run Code Online (Sandbox Code Playgroud)
我想传递2个参数我想要做的只是初始化4个DB连接(这里将尝试在每个函数调用上创建连接,因此可能有数百万个连接并导致IO Freezed死亡).如果我可以为每个进程创建4个数据库连接和1个,那就可以.
Pool有什么解决方案吗?还是我应该抛弃它?
编辑:
从你们两个人的帮助下我得到了这个:
args=zip(f,cycle(dbs))
Out[-]:
[('f1', 'db1'),
('f2', 'db2'),
('f3', 'db3'),
('f4', 'db4'),
('f5', 'db1'),
('f6', 'db2'),
('f7', 'db3'),
('f8', 'db4'),
('f9', 'db1'),
('f10', 'db2'),
('f11', 'db3'),
('f12', 'db4')]
Run Code Online (Sandbox Code Playgroud)
所以这里它将如何工作,我将数据库连接代码移动到主要级别并执行此操作:
def process_and_insert(args):
#Table Definations
args[1].table.insert(**parse_file(args[0]))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
dbs = [DAL("path_to_mysql/database") for i in range(0,3)]
args=zip(file_list,cycle(dbs))
P.map(insert_and_process,args) # here having problem.
Run Code Online (Sandbox Code Playgroud)
是的,我要测试一下,让你们知道.
jsb*_*eno 26
该Pool文件不说的传递多个参数的目标函数的方式-我已经试过只是路过的序列,但没有得到展现(序列中的一个项目为每个参数).
但是,您可以编写目标函数以期望第一个(也是唯一的)参数为元组,其中每个元素都是您期望的参数之一:
from itertools import repeat
def insert_and_process((file_to_process,db)):
db = DAL("path_to_mysql" + db)
#Table Definations
db.table.insert(**parse_file(file_to_process))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
P.map(insert_and_process,zip(file_list,repeat(db)))
Run Code Online (Sandbox Code Playgroud)
(注意-python定义中的额外括号insert_and_process将其视为应该是2项序列的单个参数.序列的第一个元素归属于第一个变量,另一个归属于第二个变量)
您的池将生成四个进程,每个进程由它自己的Python解释器实例运行.您可以使用全局变量来保存数据库连接对象,以便每个进程只创建一个连接:
global_db = None
def insert_and_process(file_to_process, db):
global global_db
if global_db is None:
# If this is the first time this function is called within this
# process, create a new connection. Otherwise, the global variable
# already holds a connection established by a former call.
global_db = DAL("path_to_mysql" + db)
global_db.table.insert(**parse_file(file_to_process))
return True
Run Code Online (Sandbox Code Playgroud)
既然Pool.map()和朋友只支持单参数工作器函数,你需要创建一个转发工作的包装器:
def insert_and_process_helper(args):
return insert_and_process(*args)
if __name__ == "__main__":
file_list=os.listdir(".")
db = "wherever you get your db"
# Create argument tuples for each function call:
jobs = [(file, db) for file in file_list]
P = Pool(processes=4)
P.map(insert_and_process_helper, jobs)
Run Code Online (Sandbox Code Playgroud)
无需使用拉链.例如,如果您有两个参数x和y,并且每个参数都可以获得多个值,例如:
X=range(1,6)
Y=range(10)
Run Code Online (Sandbox Code Playgroud)
该函数应该只获得一个参数,并将其解压缩到:
def func(params):
(x,y)=params
...
Run Code Online (Sandbox Code Playgroud)
你这样称呼它:
params = [(x,y) for x in X for y in Y]
pool.map(func, params)
Run Code Online (Sandbox Code Playgroud)