Ano*_*ous 7 python parallel-processing multiprocessing threadpool
我有一个我试图并行化的算法,因为串行运行时间很长。然而,需要并行化的函数在一个类中。multiprocessing.Pool似乎是最好和最快的方式来做到这一点,但有一个问题。它的目标函数不能是对象实例的函数。意思是这个;您可以Pool通过以下方式声明 a :
import multiprocessing as mp
cpus = mp.cpu_count()
poolCount = cpus*2
pool = mp.Pool(processes = poolCount, maxtasksperchild = 2)
Run Code Online (Sandbox Code Playgroud)
然后实际使用它:
pool.map(self.TargetFunction, args)
Run Code Online (Sandbox Code Playgroud)
但这会引发错误,因为无法对对象实例进行腌制,因为该Pool函数确实会将信息传递给其所有子进程。但我必须使用self.TargetFunction
所以我有一个想法,我将创建一个名为的新 Python 文件,parallel并简单地编写几个函数而不将它们放在一个类中,然后从我的原始类(我想并行化其函数)中调用这些函数
所以我试过这个:
import multiprocessing as mp
def MatrixHelper(args):
WM = args[0][0]
print(WM.CreateMatrixMp(*args))
return WM.CreateMatrixMp(*args)
def Start(sigmaI, sigmaX, numPixels, WM):
cpus = mp.cpu_count()
poolCount = cpus * 2
args = [(WM, sigmaI, sigmaX, i) for i in range(numPixels)]
print('Number of cpu\'s to process WM:%d'%cpus)
pool = mp.Pool(processes = poolCount, maxtasksperchild = 2)
tempData = pool.map(MatrixHelper, args)
return tempData
Run Code Online (Sandbox Code Playgroud)
这些函数不是类的一部分,使用MatrixHelperin Poolsmap函数可以正常工作。但我在做这件事时意识到这是没有出路的。需要并行化的函数 ( CreateMatrixMp) 期望将一个对象传递给它(它被声明为def CreateMatrixMp(self, sigmaI, sigmaX, i))
因为它不是从它的类内部调用的,所以它不会self传递给它。为了解决这个问题,我将Start函数传递给调用对象本身。就像,我说parallel.Start(sigmaI, sigmaX, self.numPixels, self)。然后对象self变成WM这样,我将能够最终将所需的函数调用为WM.CreateMatrixMp().
我确信这是一种非常草率的编码方式,但我只是想看看它是否有效。但是不,再次酸洗错误,该map函数根本无法处理任何对象实例。
所以我的问题是,为什么要这样设计?它似乎没用,它似乎在任何使用类的程序中都完全失效。
我尝试使用Process而不是Pool,但这需要我最终写入的数组被共享,这需要进程等待彼此。如果我不希望它被共享,那么我让每个进程编写自己的较小数组,并在最后进行一次大写。但这两种方法的运行时间都比我连续执行时要慢!Python 内置multiprocessing似乎绝对没用!
在我的 tagret 函数在类中的情况下,有人可以就如何通过多处理实际节省时间给我一些指导吗?我已经阅读了此处的帖子以供使用pathos.multiprocessing,但我在 Windows 上,并且正在与具有不同设置的多个人一起从事这个项目。让每个人都尝试安装它会很不方便。
我在尝试在类中使用多处理时遇到了类似的问题。我能够通过在网上找到的相对简单的解决方法来解决这个问题。基本上,您使用类外部的函数来解包/解包您要并行化的函数内的方法。这是我发现的两个网站,解释了如何做到这一点。
对于两者来说,想法是做这样的事情:
rom multiprocessing import Pool
import time
def unwrap_self_f(arg, **kwarg):
return C.f(*arg, **kwarg)
class C:
def f(self, name):
print 'hello %s,'%name
time.sleep(5)
print 'nice to meet you.'
def run(self):
pool = Pool(processes=2)
names = ('frank', 'justin', 'osi', 'thomas')
pool.map(unwrap_self_f, zip([self]*len(names), names))
if __name__ == '__main__':
c = C()
c.run()
Run Code Online (Sandbox Code Playgroud)
工作原理的本质multiprocessing是它产生子进程,这些子进程接收参数来运行某个函数。为了传递这些参数,它需要它们是可传递的:对主进程、sa 套接字、文件描述符和其他低级、操作系统相关的东西来说是非独占的。
这翻译为“需要能够pickle或可序列化”。
对于同一主题,当您(可以)对问题进行独立划分时,并行处理效果最佳。我可以告诉你想要共享某种输入/流/数据库源,但这可能会造成一个瓶颈,你必须在某个时刻解决这个瓶颈(至少是从“python脚本”方面,而不是“操作系统/数据库”方面。幸运的是,您现在必须尽早解决它。
您可以在需要时而不是在开始时重新编码您的类以生成/创建这些不可选取的资源
def targetFunction(self, range_params):
if not self.ready():
self._init_source()
#rest of the code
Run Code Online (Sandbox Code Playgroud)
您有点以相反的方式解决了问题(根据参数初始化一个对象)。是的,并行处理是有成本的。
您可以查看multiprocessing编程指南,以更深入地了解这个问题。