ely*_*ely 8 python class multiprocessing
我知道在处理作为类的数据成员的函数时(由于Pickling问题),多处理模块的局限性的各种 讨论.
但是,在多处理中是否存在另一个模块或任何类型的解决方案,它允许特定的类似下面的内容(特别是不强制并行应用函数的定义以存在于类外)?
class MyClass():
def __init__(self):
self.my_args = [1,2,3,4]
self.output = {}
def my_single_function(self, arg):
return arg**2
def my_parallelized_function(self):
# Use map or map_async to map my_single_function onto the
# list of self.my_args, and append the return values into
# self.output, using each arg in my_args as the key.
# The result should make self.output become
# {1:1, 2:4, 3:9, 4:16}
foo = MyClass()
foo.my_parallelized_function()
print foo.output
Run Code Online (Sandbox Code Playgroud)
注意:我可以通过移动my_single_function
到类之外并将类似的东西传递foo.my_args
给map
或map_async
命令来轻松完成此操作.但是这推动了函数的并行执行MyClass
.
对于我的应用程序(并行检索,联接和清除数据的月度横截面,然后将它们附加到一个较长时间序列,例如横截面的大数据查询),它有这个功能是非常重要的内class,因为我的程序的不同用户将使用不同的时间间隔,不同的时间增量,要收集的不同数据子集等来实例化该类的不同实例,这些实例应该与该实例相关联.
因此,我想并行也可以通过实例来完成,因为它拥有所有相关的并行查询的数据的工作,这纯粹是愚蠢的尝试写结合一些参数和生活以外的一些哈克包装函数该类(特别是因为这样的函数是非一般的.它需要来自类内部的各种细节.)
Steven Bethard 发布了一种方法来允许方法被腌制/去除.你可以像这样使用它:
import multiprocessing as mp
import copy_reg
import types
def _pickle_method(method):
# Author: Steven Bethard
# http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
# Author: Steven Bethard
# http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
# This call to copy_reg.pickle allows you to pass methods as the first arg to
# mp.Pool methods. If you comment out this line, `pool.map(self.foo, ...)` results in
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
# __builtin__.instancemethod failed
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class MyClass(object):
def __init__(self):
self.my_args = [1,2,3,4]
self.output = {}
def my_single_function(self, arg):
return arg**2
def my_parallelized_function(self):
# Use map or map_async to map my_single_function onto the
# list of self.my_args, and append the return values into
# self.output, using each arg in my_args as the key.
# The result should make self.output become
# {1:1, 2:4, 3:9, 4:16}
self.output = dict(zip(self.my_args,
pool.map(self.my_single_function, self.my_args)))
Run Code Online (Sandbox Code Playgroud)
然后
pool = mp.Pool()
foo = MyClass()
foo.my_parallelized_function()
Run Code Online (Sandbox Code Playgroud)
产量
print foo.output
# {1: 1, 2: 4, 3: 9, 4: 16}
Run Code Online (Sandbox Code Playgroud)
如果你使用一个multiprocessing
被调用的分支pathos.multiprocesssing
,你可以直接在 multiprocessing 的map
函数中使用类和类方法。这是因为dill
使用 代替pickle
or cPickle
,并且dill
可以在 python 中序列化几乎任何东西。
pathos.multiprocessing
还提供了一个异步映射函数......它可以map
使用多个参数(例如map(math.pow, [1,2,3], [4,5,6])
)
请参阅: multiprocessing 和 dill 可以一起做什么?
和:http : //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> p = Pool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
Run Code Online (Sandbox Code Playgroud)
所以你可以做你想做的事,我相信。
Python 2.7.8 (default, Jul 13 2014, 02:29:54)
[GCC 4.2.1 Compatible Apple Clang 4.1 ((tags/Apple/clang-421.11.66))] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dill
>>>
>>> class MyClass():
... def __init__(self):
... self.my_args = [1,2,3,4]
... self.output = {}
... def my_single_function(self, arg):
... return arg**2
... def my_parallelized_function(self):
... res = p.map(self.my_single_function, self.my_args)
... self.output = dict(zip(self.my_args, res))
...
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool()
>>>
>>> foo = MyClass()
>>> foo.my_parallelized_function()
>>> foo.output
{1: 1, 2: 4, 3: 9, 4: 16}
>>>
Run Code Online (Sandbox Code Playgroud)
在此处获取代码:https : //github.com/uqfoundation/pathos
归档时间: |
|
查看次数: |
3552 次 |
最近记录: |