mat*_*asq 26 python multiprocessing
我在多处理模块中使用Pool.map_async()(以及Pool.map())时遇到问题.我已经实现了一个并行循环函数,只要函数输入到Pool.map_async是一个"常规"函数,它就能正常工作.当函数是例如类的方法时,我得到一个PicklingError:
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)
我只使用Python进行科学计算,所以我对酸洗的概念不太熟悉,今天就对它进行了一些了解.我在使用python的多处理Pool.map()时已经看过几个先前的答案,比如无法选择<type'instancemethod'>,但我无法弄清楚如何使它工作,即使遵循提供的链接回答.
我的代码,目标是使用多个内核模拟Normal rv的向量.请注意,这只是一个示例,可能甚至没有在多个内核上运行的回报.
import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat
def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
"""
Purpose: Evaluate function using Multiple cores.
Input:
func - Function to evaluate in parallel
arg - Array of arguments to evaluate func(arg)
static_arg - The "static" argument (if any), i.e. the variables that are constant in the evaluation of func.
nWorkers - Number of Workers to process computations.
Output:
func(i, static_arg) for i in args.
"""
# Prepare arguments for func: Collect arguments with static argument (if any)
if static_arg != None:
arguments = [[arg] + static_arg for arg in list(args)]
else:
arguments = args
# Initialize workers
pool = mp.Pool(processes = nWorkers)
# Evaluate function
result = pool.map_async(func, arguments, chunksize = chunksize)
pool.close()
pool.join()
return sp.array(result.get()).flatten()
# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
def subfunc(a):
return spstat.norm.rvs(loc = loc, scale = scale, size = a)
return subfunc
# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be
# pickled
def test(fargs):
x, a, b = fargs
return spstat.norm.rvs(size = x, loc = a, scale = b)
# Try it out.
N = 1000000
# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.
# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)
# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)
Run Code Online (Sandbox Code Playgroud)
在使用python的多处理Pool.map()时,在关于无法选择<type'instancemethod'>的问题的答案中提供链接后,Steven Bethard(几乎在最后)建议使用copy_reg模块.他的代码是:
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
Run Code Online (Sandbox Code Playgroud)
我真的不明白如何利用这一点.我唯一想到的就是把它放在我的代码之前,但它没有帮助.一个简单的解决方案当然是与有效的方法一起使用,避免涉及copy_reg.我更感兴趣的是让copy_reg正常工作以充分利用多处理,而不必每次都解决问题.
感谢您的帮助,非常感谢.
马蒂亚斯
jsb*_*eno 19
这里的问题是"pickle"错误信息少于概念:多进程确实将你的代码分叉到"worker"不同的进程中以执行其魔术.
然后,它通过无缝地序列化和反序列化数据(即使用pickle的部分)将数据发送到不同进程和从不同进程发送数据.
当来回传递的部分数据是一个函数时 - 它假定在被调用者进程中存在一个具有相同名称的函数,并且(我猜)将函数名称作为字符串传递.由于函数是无状态的,被调用的工作进程只是用它收到的数据调用相同的函数.(Python函数不能通过pickle序列化,因此只需在master和worker进程之间传递引用)
当你的函数是一个实例中的方法时 - 虽然当我们编写python代码时,它就像一个函数一样,带有一个"自动" self
变量,它在下面是不一样的.因为实例(对象)是有状态的.这意味着工作进程没有对象的副本,该对象是您要在另一侧调用的方法的所有者.
解决将方法作为函数传递给map_async调用的方法也不起作用 - 因为多进程只使用函数引用,而不是传递它时的实际函数.
因此,您应该(1)更改代码,以便将函数 - 而不是方法 - 传递给工作进程,将对象保留的任何状态转换为要调用的新参数.(2)为map_async调用创建一个"目标"函数,该函数重构工作进程端所需的对象,然后调用其中的函数.Python中大多数简单的类本身都是可选择的,因此您可以在map_async调用上传递函数所有者本身的对象 - 而"target"函数将在工作方调用适当的方法本身.
(2)可能听起来"很难",但它可能只是这样 - 除非你的对象的课程不能被腌制:
import types
def target(object, *args, **kw):
method_name = args[0]
return getattr(object, method_name)(*args[1:])
(...)
#And add these 3 lines prior to your map_async call:
# Evaluate function
if isinstance (func, types.MethodType):
arguments.insert(0, func.__name__)
func = target
result = pool.map_async(func, arguments, chunksize = chunksize)
Run Code Online (Sandbox Code Playgroud)
*免责声明:我没有测试过这个
归档时间: |
|
查看次数: |
21298 次 |
最近记录: |