Got*_*tte 4 python methods multiprocessing
最初,我有一个类来存储一些处理过的值,并使用其他方法重用它们.
问题是当我试图将类方法划分为多个进程以加速,python生成的进程但似乎不起作用(正如我在任务管理器中看到的那样只有一个进程正在运行)并且结果永远不会被传递.
我做了几次搜索,发现pathos.multiprocessing可以做到这一点,但我想知道标准库是否可以解决这个问题?
from multiprocessing import Pool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return t
a = A(2)
a.run(list(range(10)))
Run Code Online (Sandbox Code Playgroud)
你的代码失败了,因为它不能pickle是实例方法(self.cal),这是当你通过映射它们来生成多个进程时Python试图做的事情multiprocessing.Pool(好吧,有一种方法可以做到,但它太复杂而且不是非常有用)无论如何) - 由于没有共享内存访问,它必须"打包"数据并将其发送到生成的进程以进行解包.如果你试图挑选a实例,你也会遇到同样的情况.
multiprocessing包中唯一可用的共享内存访问是一个鲜为人知的,multiprocessing.pool.ThreadPool所以如果你真的想这样做:
from multiprocessing.pool import ThreadPool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = ThreadPool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return rs
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Run Code Online (Sandbox Code Playgroud)
但是这不会给你并行化,因为它实际上映射到可以访问共享内存的常规线程.您应该传递类/静态方法(如果需要它们),并附带您希望它们使用的数据(在您的情况下self.vl).如果您需要跨进程共享该数据,则必须使用一些共享内存抽象,例如multiprocessing.Value,当然应用互斥.
UPDATE
我说你可以做到(并且有一些模块或多或少都在做它,pathos.multiprocessing例如检查)但我不认为值得这么麻烦 - 当你到达一个你必须欺骗你的系统做什么的点你想要的,你可能要么使用错误的系统,要么重新考虑你的设计.但为了明智,这里有一种方法可以在多处理设置中执行您想要的操作:
import sys
from multiprocessing import Pool
def parallel_call(params): # a helper for calling 'remote' instances
cls = getattr(sys.modules[__name__], params[0]) # get our class type
instance = cls.__new__(cls) # create a new instance without invoking __init__
instance.__dict__ = params[1] # apply the passed state to the new instance
method = getattr(instance, params[2]) # get the requested method
args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
return method(*args) # expand arguments, call our method and return the result
class A(object):
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(parallel_call, self.prepare_call("cal", dt))
t.close()
return rs
def prepare_call(self, name, args): # creates a 'remote call' package for each argument
for arg in args:
yield [self.__class__.__name__, self.__dict__, name, arg]
if __name__ == "__main__": # important protection for cross-platform use
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Run Code Online (Sandbox Code Playgroud)
我认为它是如何工作的非常自我解释,但简而言之它传递了你的类的名称,它的当前状态(sans信号,tho),一个要调用的方法和用于调用它的parallel_call函数的参数每个过程都在Pool.Python会自动对所有这些数据进行pickle和unpickle,因此所有parallel_call需要做的就是重新构建原始对象,在其中找到所需的方法并使用提供的param调用它.
这样我们只传递数据而不尝试传递活动对象,因此Python不会抱怨(在这种情况下,尝试在类参数中添加对实例方法的引用,看看会发生什么)并且一切正常.
如果你想对"魔法"嗤之以鼻,你可以使它看起来与你的代码完全一样(创建自己的Pool处理程序,从函数中提取名称并将名称发送到实际进程等)但这应该起到足够的作用为你的例子.
但是,在你提高你的希望之前,请记住,只有在共享一个"静态"实例(一个在多处理上下文中开始调用它时不会改变其初始状态的实例)之后,这才会起作用.如果A.cal方法是更改vl属性的内部状态- 它将仅影响其更改的实例(除非它在调用Pool之间调用的主实例中更改).如果你想共享状态,你可以升级parallel_call到instance.__dict__调用后接收并将其与方法调用结果一起返回,然后在调用端你必须__dict__用返回的数据更新本地以更改原始状态州.这还不够 - 你实际上必须创建一个共享的dict并处理所有的互斥体工作人员,以便所有进程同时访问它(你可以使用multiprocessing.Manager它).
所以,正如我所说,比它的价值更麻烦......