Chr*_*vey 51 python multiprocessing
我正在尝试使用多进程池对象.我希望每个进程在启动时打开数据库连接,然后使用该连接来处理传入的数据.(而不是打开和关闭每个数据位的连接.)这似乎是初始化程序是什么因为,但我无法理解工人和初始化者的沟通方式.所以我有这样的事情:
def get_cursor():
return psycopg2.connect(...).cursor()
def process_data(data):
# here I'd like to have the cursor so that I can do things with the data
if __name__ == "__main__":
pool = Pool(initializer=get_cursor, initargs=())
pool.map(process_data, get_some_data_iterator())
Run Code Online (Sandbox Code Playgroud)
我如何(或我)将光标从get_cursor()返回到process_data()?
tor*_*rek 86
因此调用initialize函数:
def worker(...):
...
if initializer is not None:
initializer(*args)
Run Code Online (Sandbox Code Playgroud)
所以在任何地方都没有保存任何返回值.你可能认为这会让你失望,但不是!每个工人都在一个单独的过程中.因此,您可以使用普通global变量.
这不是很漂亮,但它有效:
cursor = None
def set_global_cursor(...):
global cursor
cursor = ...
Run Code Online (Sandbox Code Playgroud)
现在你可以cursor在你的process_data功能中使用了.cursor每个单独进程中的变量与所有其他进程分开,因此它们不会相互衔接.
(我不知道是否psycopg2有一种不同的处理方式,不涉及multiprocessing首先使用;这是对multiprocessing模块的一般问题的一般答案.)
yee*_*lan 13
torek已经很好地解释了为什么初始化程序在这种情况下不起作用.但是,我个人并不是Global变量的粉丝,所以我想在这里粘贴另一个解决方案.
我们的想法是使用一个类来包装函数并使用"global"变量初始化类.
class Processor(object):
"""Process the data and save it to database."""
def __init__(self, credentials):
"""Initialize the class with 'global' variables"""
self.cursor = psycopg2.connect(credentials).cursor()
def __call__(self, data):
"""Do something with the cursor and data"""
self.cursor.find(data.key)
Run Code Online (Sandbox Code Playgroud)
然后打电话给
p = Pool(5)
p.map(Processor(credentials), list_of_data)
Run Code Online (Sandbox Code Playgroud)
因此,第一个参数使用凭证初始化类,返回类的实例并使用数据映射调用实例.
虽然这不像全局变量解决方案那么简单,但我强烈建议避免全局变量并以某种安全的方式封装变量.(我真的希望有一天他们可以支持lambda表达,它会让事情变得更容易......)
The*_*Cat 11
您还可以将函数发送到初始化程序并在其中创建连接.然后将光标添加到该函数.
def init_worker(function):
function.cursor = db.conn()
Run Code Online (Sandbox Code Playgroud)
现在您可以通过function.cursor访问db而不使用globals,例如:
def use_db(i):
print(use_db.cursor) #process local
pool = Pool(initializer=init_worker, initargs=(use_db,))
pool.map(use_db, range(10))
Run Code Online (Sandbox Code Playgroud)
鉴于在初始化器中定义全局变量通常是不可取的,因此我们可以避免使用它们,并且还可以避免在每个调用中通过在每个子进程中进行简单的缓存来重复进行昂贵的初始化:
from functools import lru_cache
from multiprocessing.pool import Pool
from time import sleep
@lru_cache(maxsize=None)
def _initializer(a, b):
print(f'Initialized with {a}, {b}')
def _pool_func(a, b, i):
_initializer(a, b)
sleep(1)
print(f'got {i}')
arg_a = 1
arg_b = 2
with Pool(processes=5) as pool:
pool.starmap(_pool_func, ((arg_a, arg_b, i) for i in range(0, 20)))
Run Code Online (Sandbox Code Playgroud)
输出:
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
got 1
got 0
got 4
got 2
got 3
got 5
got 7
got 8
got 6
got 9
got 10
got 11
got 12
got 14
got 13
got 15
got 16
got 17
got 18
got 19
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
21254 次 |
| 最近记录: |