我想multiprocessing在Python中使用该库.遗憾地multiprocessing使用pickle不支持闭包,lambdas或函数的函数__main__.所有这三个对我来说都很重要
In [1]: import pickle
In [2]: pickle.dumps(lambda x: x)
PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda>
Run Code Online (Sandbox Code Playgroud)
幸运的是,有dill一个更健壮的泡菜.显然dill在导入时执行魔术以使泡菜工作
In [3]: import dill
In [4]: pickle.dumps(lambda x: x)
Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ...
Run Code Online (Sandbox Code Playgroud)
这非常令人鼓舞,特别是因为我无法访问多处理源代码.可悲的是,我仍然无法得到这个非常基本的例子
import multiprocessing as mp
import dill
p = mp.Pool(4)
print p.map(lambda x: x**2, range(10))
Run Code Online (Sandbox Code Playgroud)
为什么是这样?我错过了什么?究竟是multiprocessing+ dill组合的限制是什么?
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Temporary Edit for J.F Sebastian
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Exception …Run Code Online (Sandbox Code Playgroud) 使用时遇到此错误pool.map(funct, iterable):
AttributeError: __exit__
Run Code Online (Sandbox Code Playgroud)
否解释,只将堆栈跟踪到模块中的pool.py文件.
以这种方式使用:
with Pool(processes=2) as pool:
pool.map(myFunction, mylist)
pool.map(myfunction2, mylist2)
Run Code Online (Sandbox Code Playgroud)
我怀疑可挑选性可能存在问题(python需要pickle,或将列表数据转换为字节流)但我不确定这是否属实或是否如何调试.
编辑:产生此错误的新格式代码:
def governingFunct(list):
#some tasks
def myFunction():
# function contents
with closing(Pool(processes=2)) as pool:
pool.map(myFunction, sublist)
pool.map(myFunction2, sublist2)
Run Code Online (Sandbox Code Playgroud)
错误产生:
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud) 我必须像这样挑选一个对象数组:
import cPickle as pickle
from numpy import sin, cos, array
tmp = lambda x: sin(x)+cos(x)
test = array([[tmp,tmp],[tmp,tmp]],dtype=object)
pickle.dump( test, open('test.lambda','w') )
Run Code Online (Sandbox Code Playgroud)
它会出现以下错误:
TypeError: can't pickle function objects
Run Code Online (Sandbox Code Playgroud)
有办法吗?
我正在尝试使用多处理编写Python 2.6(OSX)程序,并且我想填充一个超过默认值32767项的Queue.
from multiprocessing import Queue
Queue(2**15) # raises OSError
Run Code Online (Sandbox Code Playgroud)
Queue(32767)工作正常,但任何更高的数字(例如Queue(32768))都失败了OSError: [Errno 22] Invalid argument
这个问题有解决方法吗?
我想运行这样的东西:
from multiprocessing import Pool
import time
import random
class Controler(object):
def __init__(self):
nProcess = 10
pages = 10
self.__result = []
self.manageWork(nProcess,pages)
def BarcodeSearcher(x):
return x*x
def resultCollector(self,result):
self.__result.append(result)
def manageWork(self,nProcess,pages):
pool = Pool(processes=nProcess)
for pag in range(pages):
pool.apply_async(self.BarcodeSearcher, args = (pag, ), callback = self.resultCollector)
print self.__result
if __name__ == '__main__':
Controler()
Run Code Online (Sandbox Code Playgroud)
但代码导致错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Python26\lib\threading.py", line 522, in __bootstrap_inner
self.run()
File "C:\Python26\lib\threading.py", line 477, in run
self.__target(*self.__args, **self.__kwargs)
File …Run Code Online (Sandbox Code Playgroud) 这是MWE我正在使用的更大的代码.基本上,它针对位于特定阈值以下的所有值对KDE(内核密度估计)执行蒙特卡洛积分(在该问题上建议积分方法BTW:积分2D核密度估计).
import numpy as np
from scipy import stats
import time
# Generate some random two-dimensional data:
def measure(n):
"Measurement model, return two coupled measurements."
m1 = np.random.normal(size=n)
m2 = np.random.normal(scale=0.5, size=n)
return m1+m2, m1-m2
# Get data.
m1, m2 = measure(20000)
# Define limits.
xmin = m1.min()
xmax = m1.max()
ymin = m2.min()
ymax = m2.max()
# Perform a kernel density estimate on the data.
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, …Run Code Online (Sandbox Code Playgroud) 我有一个庞大的列表,我需要处理,这需要一些时间,所以我把它分成4件,并用一些功能多处理每件.使用4个内核运行仍然需要一些时间,所以我想我会在函数中添加一些进度条,以便它可以告诉我处理列表时每个处理器的位置.
我的梦想是拥有这样的东西:
erasing close atoms, cpu0 [######..............................] 13%
erasing close atoms, cpu1 [#######.............................] 15%
erasing close atoms, cpu2 [######..............................] 13%
erasing close atoms, cpu3 [######..............................] 14%
Run Code Online (Sandbox Code Playgroud)
每个条随着函数循环的移动而移动.但相反,我得到一个持续的流程:
等等,填满我的终端窗口.
这是调用函数的主要python脚本:
from eraseCloseAtoms import *
from readPDB import *
import multiprocessing as mp
from vectorCalc import *
prot, cell = readPDB('file')
atoms = vectorCalc(cell)
output = mp.Queue()
# setup mp to erase grid atoms that are too close to the protein (dmin = 2.5A)
cpuNum = 4
tasks = len(atoms)
rangeSet = …Run Code Online (Sandbox Code Playgroud) 实用程序
def exec_multiprocessing(self, method, args):
with concurrent.futures.ProcessPoolExecutor() as executor:
results = pool.map(method, args)
return results
Run Code Online (Sandbox Code Playgroud)
clone.py
def clone_vm(self, name, first_run, host, ip):
# clone stuff
Run Code Online (Sandbox Code Playgroud)
invoke.py
exec_args = [(name, first_run, host, ip) for host, ip in zip(hosts, ips)]
results = self.util.exec_multiprocessing(self.clone.clone_vm, exec_args)
Run Code Online (Sandbox Code Playgroud)
上面的代码给出了酸洗错误。我发现这是因为我们正在传递实例方法。因此,我们应该解开实例方法。但是我无法使其工作。
注意:我无法创建顶级方法来避免这种情况。我必须使用实例方法。
下面的结果是我有一个令人尴尬的并行for循环,我试图解决.解释这个问题还有一些麻烦,但尽管所有问题都很冗长,但我认为这应该是一个相当简单的问题,多处理模块的设计很容易解决.
我有一个由k个不同函数组成的大长度N数组,以及一个长度为N的abcissa数组.由于@senderle提供的聪明的解决方案在高效算法中描述,用于评估相同长度的1d numpy数组上的1-d函数数组,我有一个快速的基于numpy的算法,我可以用它来评估abcissa的函数返回长度为N的纵坐标数组:
def apply_indexed_fast(abcissa, func_indices, func_table):
""" Returns the output of an array of functions evaluated at a set of input points
if the indices of the table storing the required functions are known.
Parameters
----------
func_table : array_like
Length k array of function objects
abcissa : array_like
Length Npts array of points at which to evaluate the functions.
func_indices : array_like
Length Npts array providing the indices to use to choose which …Run Code Online (Sandbox Code Playgroud) python parallel-processing performance numpy scientific-computing
我有一个CPU密集型功能:
def entity_intersections(ent, collidable):
intersections = []
for line1, line2 in product(ent.shape, collidable.shape):
pair_intersections = find_intersections(line1 + ent.position, ent.velocity, ent.acceleration, line2 + collidable.position, collidable.velocity, collidable.acceleration, ent, collidable)
intersections.extend(pair_intersections)
return intersections
Run Code Online (Sandbox Code Playgroud)
我希望让所有调用find_intersections并行运行,以便它们执行得更快,同时仍然将所有结果收集在一起(一旦所有执行完成).什么库允许我这样做,因为这find_intersections是一个纯函数?
将非常感谢如何生成这些并行执行以及将结果收集在一起的示例.
我一直试图挑选一个包含对静态类方法的引用的对象.Pickle失败(例如打开module.MyClass.foo)说明它不能被腌制,因为module.foo不存在.
我提出了以下解决方案,使用包装器对象在调用时定位函数,保存容器类和函数名称:
class PicklableStaticMethod(object):
"""Picklable version of a static method.
Typical usage:
class MyClass:
@staticmethod
def doit():
print "done"
# This cannot be pickled:
non_picklable = MyClass.doit
# This can be pickled:
picklable = PicklableStaticMethod(MyClass.doit, MyClass)
"""
def __init__(self, func, parent_class):
self.func_name = func.func_name
self.parent_class = parent_class
def __call__(self, *args, **kwargs):
func = getattr(self.parent_class, self.func_name)
return func(*args, **kwargs)
Run Code Online (Sandbox Code Playgroud)
我想知道,是否有更好的 - 更标准的方法 - 来腌制这样的物体?我不想对全局pickle进程进行更改(copy_reg例如使用),但以下模式会很棒:class MyClass(object):@ patch_staticmethod def foo():print"done".
我对此的尝试是不成功的,特别是因为我无法从foo函数中提取所有者类.我甚至愿意接受明确的规范(例如@picklable_staticmethod(MyClass)),但我不知道有什么方法可以 …
我试图将我的脚本从使用线程转换为更酷的多处理(使用python 3.2和concurrent.futures,但是这段代码崩溃了
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
for result in executor.map(lambda h:
validate_hostname(h, pci_ids, options.verbose),
get_all_hostnames()):
Run Code Online (Sandbox Code Playgroud)
我收到错误_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed.在阅读这个答案时,我认为问题在于不可能将lambda函数作为参数,executor.map()并且为了使executor.map()我需要开发一个参数函数,但是那些pci_ids并且options.verbose是变量所以我不能将它们指定为固定的帮助功能中的值.
有什么想法怎么办?
python ×12
numpy ×4
pickle ×4
lambda ×2
performance ×2
arrays ×1
concurrency ×1
dill ×1
max-size ×1
montecarlo ×1
pool ×1
python-2.7 ×1
python-3.x ×1
python-click ×1
queue ×1