代码如下所示:
import multiprocessing as mp
from functools import partial
import boto3
import numpy as np
s3 = boto3.client('s3')
def _something(**kwargs):
# Some mixed integer programming stuff related to the variable archive
return np.array(some_variable_related_to_archive)
def do(s3):
archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant
pool = mp.pool()
sub_process = partial(_something, slack=0.1)
parts = np.array_split(archive, some_int)
target_parts = np.array(things)
out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line
pool.close()
pool.join()
do(s3)
Run Code Online (Sandbox Code Playgroud)
错误:
_pickle.PicklingError: Can't pickle …Run Code Online (Sandbox Code Playgroud) 在处理多处理时遇到了酸洗错误:
from multiprocessing import Pool
def test_func(x):
return x**2
class Test:
@classmethod
def func(cls, x):
return x**2
def mp_run(n, func, args):
return Pool(n).map(func, args)
if __name__ == '__main__':
args = range(1,6)
print mp_run(5, test_func, args)
# [1, 4, 9, 16, 25]
print mp_run(5, Test.func, args)
"""
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle …Run Code Online (Sandbox Code Playgroud) 我有以下代码.
def main():
(minI, maxI, iStep, minJ, maxJ, jStep, a, b, numProcessors) = sys.argv
for i in range(minI, maxI, iStep):
for j in range(minJ, maxJ, jStep):
p = multiprocessing.Process(target=functionA, args=(minI, minJ))
p.start()
def functionB((a, b)):
subprocess.call('program1 %s %s %s %s %s %s' %(c, a, b, 'file1',
'file2', 'file3'), shell=True)
for d in ['a', 'b', 'c']:
subprocess.call('program2 %s %s %s %s %s' %(d, 'file4', 'file5',
'file6', 'file7'), shell=True)
abProduct = list(itertools.product(range(0, 10), range(0, 10)))
pool = multiprocessing.Pool(processes=numProcessors)
pool.map(functionB, abProduct)
Run Code Online (Sandbox Code Playgroud)
它会产生以下错误.
Exception …Run Code Online (Sandbox Code Playgroud) 所以我试图通过做一点多处理来加快我的计算时间
我正在尝试使用池工人.
在我的代码的顶部,我有
import Singal as s
import multiprocessing as mp
def wrapper(Channel):
Noise_Frequincies = []
for i in range(1,125):
Noise_Frequincies.append(60.0*float(i))
Noise_Frequincies.append(180.0)
filter1 = s.Noise_Reduction(Sample_Rate,Noise_Frequincies,Channel)
return filter1
Run Code Online (Sandbox Code Playgroud)
然后到时候我用
Both_Channels = [Chan1, Chan2]
results = mp.Pool(2).map(wrapper,Both_Channels)
filter1 = results[0]
filter2 = results[1]
Run Code Online (Sandbox Code Playgroud)
我收到以下错误
Exception in thread Thread-2:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup …Run Code Online (Sandbox Code Playgroud) 这个(非常简化的例子)工作正常(Python 2.6.6,Debian Squeeze):
from multiprocessing import Pool
import numpy as np
src=None
def process(row):
return np.sum(src[row])
def main():
global src
src=np.ones((100,100))
pool=Pool(processes=16)
rows=pool.map(process,range(100))
print rows
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
然而,经过多年的教育,全球状态不好!!! ,我所有的直觉告诉我,我真的宁愿写更接近的东西:
from multiprocessing import Pool
import numpy as np
def main():
src=np.ones((100,100))
def process(row):
return np.sum(src[row])
pool=Pool(processes=16)
rows=pool.map(process,range(100))
print rows
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
但当然这不起作用(挂起无法腌制的东西).
这里的例子是微不足道的,但是当你添加多个"进程"函数时,每个函数都依赖于多个额外的输入......这一切都让人想起30年前用BASIC编写的东西.尝试使用类来至少使用适当的函数聚合状态似乎是一个明显的解决方案,但在实践中似乎并不那么容易.
是否有一些推荐的模式或样式使用multiprocessing.pool,这将避免全局状态的扩散,以支持我想并行映射的每个函数?
经验丰富的"多处理专业人员"如何处理这个问题?
更新:请注意,我实际上对处理更大的数组感兴趣,因此上面的src每个调用/迭代的变化都不如将其分配到池的工作进程中.
尝试在我的RDD中启动一个类时,我正在运行一些属性查找问题.
我的工作流程
1-从RDD开始
2-获取RDD的每个元素,为每个元素启动一个对象
3- Reduce(我将编写一个稍后将定义reduce操作的方法)
这是#2:
>class test(object):
def __init__(self, a,b):
self.total = a + b
>a = sc.parallelize([(True,False),(False,False)])
>a.map(lambda (x,y): test(x,y))
Run Code Online (Sandbox Code Playgroud)
这是我得到的错误:
PicklingError:不能咸菜<类的主.测试'>:属性查找主.测试失败
我想知道它是否有任何解决方法.请回答一个工作示例来实现预期的结果(即创建类"测试"对象的RDD).
相关问题:
也许更熟悉Python的多处理池代码的人可以帮助我.我试图通过套接字连接同时连接到我的网络上的几个主机(任何时候N)并执行一些RPC.当一个主机完成时,我想将下一个主机添加到池中以运行直到所有主机都完成.
我有一个类,HClass,有一些方法可以这样做,还有一个主机名列表中包含的主机名.但我没有找到任何一个docs.python.org的例子来使这个工作.
一小段代码来说明我到目前为止所获得的内容:
hostlist = [h1, h2, h3, h4, ....]
poolsize = 2
class HClass:
def __init__(self, hostname="default"):
self.hostname = hostname
def go(self):
# do stuff
# do more stuff
....
if __name__ == "__main__":
objs = [HClass(hostname=current_host) for current_host in hostlist]
pool = multiprocessing.pool(poolsize)
results = pool.apply_async(objs.go())
Run Code Online (Sandbox Code Playgroud)
到目前为止,我很幸运有这个追溯:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task) …Run Code Online (Sandbox Code Playgroud) 实例方法不能在Python 2或Python 3中自动腌制.
我需要使用Python 3 挑选实例方法,并将Steven Bethard的示例代码移植到Python 3:
import copyreg
import types
def _pickle_method(method):
func_name = method.__func__.__name__
obj = method.__self__
cls = method.__self__.__class__
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method)
Run Code Online (Sandbox Code Playgroud)
这种方法是否适用于酸洗实例方法?或者有些事情可怕的错误?我用一些模拟类测试了它,一切似乎都有效.
如果什么都不会出错,为什么Python 3中的标准pickle实例方法不可能?
我试图在rpyc服务中使用多处理程序包,但是ValueError: pickling is disabled当我尝试从客户端调用公开函数时会得到提示。我知道该multiprocesing程序包使用酸洗在进程之间传递信息,并且不允许酸洗,rpyc因为这是不安全的协议。因此,我不确定将多处理与rpyc一起使用的最佳方法(或者是否存在)。如何在rpyc服务中使用多重处理?这是服务器端代码:
import rpyc
from multiprocessing import Pool
class MyService(rpyc.Service):
def exposed_RemotePool(self, function, arglist):
pool = Pool(processes = 8)
result = pool.map(function, arglist)
pool.close()
return result
if __name__ == "__main__":
from rpyc.utils.server import ThreadedServer
t = ThreadedServer(MyService, port = 18861)
t.start()
Run Code Online (Sandbox Code Playgroud)
这是产生错误的客户端代码:
import rpyc
def square(x):
return x*x
c = rpyc.connect("localhost", 18861)
result = c.root.exposed_RemotePool(square, [1,2,3,4])
print(result)
Run Code Online (Sandbox Code Playgroud) 我需要提升我的 python 应用程序。解决方案应该是微不足道的:
import time
from multiprocessing import Pool
class A:
def method1(self):
time.sleep(1)
print('method1')
return 'method1'
def method2(self):
time.sleep(1)
print('method2')
return 'method2'
def method3(self):
pool = Pool()
time1 = time.time()
res1 = pool.apply_async(self.method1, [])
res2 = pool.apply_async(self.method2, [])
res1 = res1.get()
res2 = res2.get()
time2 = time.time()
print('res1 = {0}'.format(res1))
print('res2 = {0}'.format(res2))
print('time = {0}'.format(time2 - time1))
a = A()
a.method3()
Run Code Online (Sandbox Code Playgroud)
但是每次我启动这个简单的程序时,我都会遇到一个异常:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python3.2/threading.py", line 740, in _bootstrap_inner
self.run() …Run Code Online (Sandbox Code Playgroud) 实用程序
def exec_multiprocessing(self, method, args):
with concurrent.futures.ProcessPoolExecutor() as executor:
results = pool.map(method, args)
return results
Run Code Online (Sandbox Code Playgroud)
clone.py
def clone_vm(self, name, first_run, host, ip):
# clone stuff
Run Code Online (Sandbox Code Playgroud)
invoke.py
exec_args = [(name, first_run, host, ip) for host, ip in zip(hosts, ips)]
results = self.util.exec_multiprocessing(self.clone.clone_vm, exec_args)
Run Code Online (Sandbox Code Playgroud)
上面的代码给出了酸洗错误。我发现这是因为我们正在传递实例方法。因此,我们应该解开实例方法。但是我无法使其工作。
注意:我无法创建顶级方法来避免这种情况。我必须使用实例方法。
我想在此代码中使用多进程包.我试图调用该函数create_new_population并将数据分发到8个处理器,但是当我这样做时,我得到了pickle错误.
通常函数会像这样运行: self.create_new_population(self.pop_size)
我尝试像这样分发工作:
f= self.create_new_population
pop = self.pop_size/8
self.current_generation = [pool.apply_async(f, pop) for _ in range(8)]
Run Code Online (Sandbox Code Playgroud)
我得到
或Can't pickle local object 'exhaust.__init__.<locals>.tour_select'PermissionError: [WinError 5] Access is denied
我仔细阅读了这个帖子,并尝试使用Steven Bethard的方法绕过错误,允许通过copyreg进行方法酸洗/ 取消:
def _pickle_method(method)
def _unpickle_method(func_name, obj, cls)
Run Code Online (Sandbox Code Playgroud)
我还尝试使用pathos包没有任何运气.
我知道应该在if __name__ == '__main__':块下调用代码,但我想知道是否可以在代码中尽可能少的更改来完成.
python parallel-processing multithreading python-multiprocessing pathos
python ×11
pickle ×4
pool ×2
apache-spark ×1
boto3 ×1
pathos ×1
python-3.x ×1
rpyc ×1
starmap ×1