相关疑难解决方法(0)

如何使用初始化程序来设置我的多进程池?

我正在尝试使用多进程池对象.我希望每个进程在启动时打开数据库连接,然后使用该连接来处理传入的数据.(而不是打开和关闭每个数据位的连接.)这似乎是初始化程序是什么因为,但我无法理解工人和初始化者的沟通方式.所以我有这样的事情:

def get_cursor():
  return psycopg2.connect(...).cursor()

def process_data(data):
   # here I'd like to have the cursor so that I can do things with the data

if __name__ == "__main__":
  pool = Pool(initializer=get_cursor, initargs=())
  pool.map(process_data, get_some_data_iterator())
Run Code Online (Sandbox Code Playgroud)

我如何(或我)将光标从get_cursor()返回到process_data()?

python multiprocessing

51
推荐指数
4
解决办法
2万
查看次数

multiprocessing.pool.map和带有两个参数的函数

我在用 multiprocessing.Pool()

这是我想要的池:

def insert_and_process(file_to_process,db):
    db = DAL("path_to_mysql" + db)
    #Table Definations
    db.table.insert(**parse_file(file_to_process))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)
    P.map(insert_and_process,file_list,db) # here having problem.
Run Code Online (Sandbox Code Playgroud)

我想传递2个参数我想要做的只是初始化4个DB连接(这里将尝试在每个函数调用上创建连接,因此可能有数百万个连接并导致IO Freezed死亡).如果我可以为每个进程创建4个数据库连接和1个,那就可以.

Pool有什么解决方案吗?还是我应该抛弃它?

编辑:

从你们两个人的帮助下我得到了这个:

args=zip(f,cycle(dbs))
Out[-]: 
[('f1', 'db1'),
 ('f2', 'db2'),
 ('f3', 'db3'),
 ('f4', 'db4'),
 ('f5', 'db1'),
 ('f6', 'db2'),
 ('f7', 'db3'),
 ('f8', 'db4'),
 ('f9', 'db1'),
 ('f10', 'db2'),
 ('f11', 'db3'),
 ('f12', 'db4')]
Run Code Online (Sandbox Code Playgroud)

所以这里它将如何工作,我将数据库连接代码移动到主要级别并执行此操作:

def process_and_insert(args):

    #Table Definations
    args[1].table.insert(**parse_file(args[0]))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)

    dbs = [DAL("path_to_mysql/database") for i in range(0,3)]
    args=zip(file_list,cycle(dbs))
    P.map(insert_and_process,args) …
Run Code Online (Sandbox Code Playgroud)

python concurrency multithreading multiprocessing

10
推荐指数
3
解决办法
3万
查看次数