我正在尝试使用多进程池对象.我希望每个进程在启动时打开数据库连接,然后使用该连接来处理传入的数据.(而不是打开和关闭每个数据位的连接.)这似乎是初始化程序是什么因为,但我无法理解工人和初始化者的沟通方式.所以我有这样的事情:
def get_cursor():
return psycopg2.connect(...).cursor()
def process_data(data):
# here I'd like to have the cursor so that I can do things with the data
if __name__ == "__main__":
pool = Pool(initializer=get_cursor, initargs=())
pool.map(process_data, get_some_data_iterator())
Run Code Online (Sandbox Code Playgroud)
我如何(或我)将光标从get_cursor()返回到process_data()?
我在用 multiprocessing.Pool()
这是我想要的池:
def insert_and_process(file_to_process,db):
db = DAL("path_to_mysql" + db)
#Table Definations
db.table.insert(**parse_file(file_to_process))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
P.map(insert_and_process,file_list,db) # here having problem.
Run Code Online (Sandbox Code Playgroud)
我想传递2个参数我想要做的只是初始化4个DB连接(这里将尝试在每个函数调用上创建连接,因此可能有数百万个连接并导致IO Freezed死亡).如果我可以为每个进程创建4个数据库连接和1个,那就可以.
Pool有什么解决方案吗?还是我应该抛弃它?
编辑:
从你们两个人的帮助下我得到了这个:
args=zip(f,cycle(dbs))
Out[-]:
[('f1', 'db1'),
('f2', 'db2'),
('f3', 'db3'),
('f4', 'db4'),
('f5', 'db1'),
('f6', 'db2'),
('f7', 'db3'),
('f8', 'db4'),
('f9', 'db1'),
('f10', 'db2'),
('f11', 'db3'),
('f12', 'db4')]
Run Code Online (Sandbox Code Playgroud)
所以这里它将如何工作,我将数据库连接代码移动到主要级别并执行此操作:
def process_and_insert(args):
#Table Definations
args[1].table.insert(**parse_file(args[0]))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
dbs = [DAL("path_to_mysql/database") for i in range(0,3)]
args=zip(file_list,cycle(dbs))
P.map(insert_and_process,args) …Run Code Online (Sandbox Code Playgroud)