Dav*_*ang 5 python parallel-processing numpy multiprocessing python-multiprocessing
import multiprocessing
import numpy as np
import multiprocessing as mp
import ctypes
class Test():
def __init__(self):
shared_array_base = multiprocessing.Array(ctypes.c_double, 100, lock=False)
self.a = shared_array = np.ctypeslib.as_array(shared_array_base)
def my_fun(self,i):
self.a[i] = 1
if __name__ == "__main__":
num_cores = multiprocessing.cpu_count()
t = Test()
def my_fun_wrapper(i):
t.my_fun(i)
with mp.Pool(num_cores) as p:
p.map(my_fun_wrapper, np.arange(100))
print(t.a)
Run Code Online (Sandbox Code Playgroud)
在上面的代码中,我尝试编写一段代码来修改数组,使用multiprocessing. 在每个进程中执行的函数应该修改作为参数传递给索引处的my_fun()数组的值。关于上面的代码,我想知道复制的是什么。a[:]imy_fun()
1)每个进程都复制代码中的任何内容吗?我认为该物体可能是,但理想情况下什么都不是。
2)有没有办法绕过my_fun()对对象使用包装函数?
几乎代码中的所有内容都会被复制,除了您分配的共享内存multiprocessing.Array。multiprocessing充满了不直观的、隐含的副本。
当您在 中生成新进程时multiprocessing,新进程需要原始进程中几乎所有内容的自己的版本。根据平台和设置的不同,处理方式有所不同,但我们可以告诉您正在使用“fork”模式,因为您的代码在“spawn”或“forkserver”模式下无法工作 - 您会收到有关工作人员不工作的错误能够找到my_fun_wrapper。(Windows 仅支持“spawn”,因此我们可以判断您不在 Windows 上。)
在“fork”模式下,这个初始副本是通过使用fork系统调用来要求操作系统复制整个进程及其内部的所有内容来进行的。由 分配的内存multiprocessing.Array是一种“外部”内存,不会被复制,但大多数其他内容都会被复制。(还有写时复制优化,但写时复制仍然表现得好像所有内容都已复制,并且由于引用计数更新,该优化在 Python 中效果不佳。)
当您将任务分派给工作进程时,multiprocessing需要制作更多副本。任何参数以及任务本身的可调用项都是主进程中的对象,并且对象本质上只存在于一个进程中。工人们无法访问其中的任何内容。他们需要自己的版本。multiprocessing通过腌制可调用对象和参数、通过进程间通信发送序列化字节以及取消腌制工作程序中的腌菜来处理第二轮副本。
当 master pickles 时,pickle 只是说“在模块中my_fun_wrapper查找函数”,而工作人员则查找他们的版本来取消它。查找由 fork 生成的全局, 并在工作进程中,fork 生成一个 ,其中包含由您在原始调用中分配的共享内存支持的数组。my_fun_wrapper__main__my_fun_wrappermy_fun_wrappertttmultiprocessing.Array
另一方面,如果您尝试传递t.my_fun给p.map,则multiprocessing必须 pickle 和 unpickle 方法对象。生成的 pickle 并没有说“查找t全局变量并获取其my_fun方法”。pickle 表示构建一个新 Test实例并获取其 my_fun方法。pickle 中没有任何有关使用您分配的共享内存的指令,并且生成的Test实例及其数组独立于您想要修改的原始数组。
我知道没有什么好方法可以避免需要某种包装函数。