如何使用初始化程序来设置我的多进程池?

Chr*_*vey 51 python multiprocessing

我正在尝试使用多进程池对象.我希望每个进程在启动时打开数据库连接,然后使用该连接来处理传入的数据.(而不是打开和关闭每个数据位的连接.)这似乎是初始化程序是什么因为,但我无法理解工人和初始化者的沟通方式.所以我有这样的事情:

def get_cursor():
  return psycopg2.connect(...).cursor()

def process_data(data):
   # here I'd like to have the cursor so that I can do things with the data

if __name__ == "__main__":
  pool = Pool(initializer=get_cursor, initargs=())
  pool.map(process_data, get_some_data_iterator())
Run Code Online (Sandbox Code Playgroud)

我如何(或我)将光标从get_cursor()返回到process_data()?

tor*_*rek 86

因此调用initialize函数:

def worker(...):
    ...
    if initializer is not None:
        initializer(*args)
Run Code Online (Sandbox Code Playgroud)

所以在任何地方都没有保存任何返回值.你可能认为这会让你失望,但不是!每个工人都在一个单独的过程中.因此,您可以使用普通global变量.

这不是很漂亮,但它有效:

cursor = None
def set_global_cursor(...):
    global cursor
    cursor = ...
Run Code Online (Sandbox Code Playgroud)

现在你可以cursor在你的process_data功能中使用了.cursor每个单独进程中的变量与所有其他进程分开,因此它们不会相互衔接.

(我不知道是否psycopg2有一种不同的处理方式,不涉及multiprocessing首先使用;这是对multiprocessing模块的一般问题的一般答案.)

  • @TheUnfunCat:不知道“ init_worker”是什么(我在您的回答中看到一个,但原始问题中没有),我不能确定地说。一般的想法是允许multiprocess.Pool创建一个进程池,并让每个进程创建数据库连接(它自己的私有副本)。如果希望在启动池进程时发生这种情况,请使用初始化函数。如果您希望以后发生,可以稍后再做。无论哪种方式,您都需要一个持久变量,例如您的方法中的function.cursor,或一个普通的global。 (2认同)
  • 无论如何,我发现我和你的解决方案都很丑陋而且有点神奇(我相信 pylint 也会抱怨)。我想知道是否有更pythonic的方式...... (2认同)

yee*_*lan 13

torek已经很好地解释了为什么初始化程序在这种情况下不起作用.但是,我个人并不是Global变量的粉丝,所以我想在这里粘贴另一个解决方案.

我们的想法是使用一个类来包装函数并使用"global"变量初始化类.

class Processor(object):
  """Process the data and save it to database."""

  def __init__(self, credentials):
    """Initialize the class with 'global' variables"""
    self.cursor = psycopg2.connect(credentials).cursor()

  def __call__(self, data):
    """Do something with the cursor and data"""
    self.cursor.find(data.key)
Run Code Online (Sandbox Code Playgroud)

然后打电话给

p = Pool(5)
p.map(Processor(credentials), list_of_data)
Run Code Online (Sandbox Code Playgroud)

因此,第一个参数使用凭证初始化类,返回类的实例并使用数据映射调用实例.

虽然这不像全局变量解决方案那么简单,但我强烈建议避免全局变量并以某种安全的方式封装变量.(我真的希望有一天他们可以支持lambda表达,它会让事情变得更容易......)

  • 它**通常很好避免全局变量,你可以做这样的事情,但你会推迟初始化`self.cursor`,直到`p.map`实际上已经启动流程实例.也就是说,你的`__init__`只会把它设置为`None`而`__call__`会说'如果self.cursor是None:self.cursor = ...`.最后,我们真正需要的是一个单进程单例. (13认同)
  • 此解决方案无法获得与使用全局变量相同的结果。每当map(...)将一个任务从list_of_data移交给Processor .__ call __()时,整个Processor对象都会被腌制,并作为第一个参数传递给__call __(self,data)。 b / c这是一个实例方法。即使psycopg2.connection.Cursor()对象是可腌制的,您也无法初始化任何变量,而是腌制该对象,然后从__call __()中的self实例中访问它。在子进程中。另外,如果“处理器”上的任何对象很大,则此解决方案将减慢爬网速度。 (4认同)
  • 我喜欢这个答案,因为它很漂亮,但它不会为列表中的每个项目重新连接吗? (3认同)
  • 这是否会导致为每个任务重新运行初始化程序(可能在池中的每个进程多于一次)? (3认同)
  • 如果初始化很耗时,这个答案基本上是序列化初始化,这是一个错误的答案.此外,一些时间初始化不能在一个进程中完成两次. (3认同)

The*_*Cat 11

您还可以将函数发送到初始化程序并在其中创建连接.然后将光标添加到该函数.

def init_worker(function):
    function.cursor = db.conn()
Run Code Online (Sandbox Code Playgroud)

现在您可以通过function.cursor访问db而不使用globals,例如:

def use_db(i):
    print(use_db.cursor) #process local
pool = Pool(initializer=init_worker, initargs=(use_db,))
pool.map(use_db, range(10))
Run Code Online (Sandbox Code Playgroud)

  • 我喜欢这个答案,因为它不会为每个调用传递初始化参数。如果初始值设定项参数很大,那么我不希望它们在每次调用时都被腌制。 (3认同)
  • 您的过程命令是否类似于:p = Pool(initializer = init_worker,args =(func)); p.map(func,args_set); ?? (2认同)
  • 我不明白这个答案。SQL逻辑将在哪里执行? (2认同)

mcg*_*uip 5

鉴于在初始化器中定义全局变量通常是不可取的,因此我们可以避免使用它们,并且还可以避免在每个调用中通过在每个子进程中进行简单的缓存来重复进行昂贵的初始化:

from functools import lru_cache
from multiprocessing.pool import Pool
from time import sleep


@lru_cache(maxsize=None)
def _initializer(a, b):
    print(f'Initialized with {a}, {b}')


def _pool_func(a, b, i):
    _initializer(a, b)
    sleep(1)
    print(f'got {i}')


arg_a = 1
arg_b = 2

with Pool(processes=5) as pool:
    pool.starmap(_pool_func, ((arg_a, arg_b, i) for i in range(0, 20)))
Run Code Online (Sandbox Code Playgroud)

输出:

Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
got 1
got 0
got 4
got 2
got 3
got 5
got 7
got 8
got 6
got 9
got 10
got 11
got 12
got 14
got 13
got 15
got 16
got 17
got 18
got 19
Run Code Online (Sandbox Code Playgroud)

  • 这只会节省您在初始化程序中扩展的计算。相反,如果您的初始化程序主要包括在主进程和工作进程之间传输大量数据,那么它对您没有帮助,与上述解决方案相反。 (2认同)